ruanhang1993 commented on code in PR #3510:
URL: https://github.com/apache/flink-cdc/pull/3510#discussion_r1883350098


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java:
##########
@@ -285,6 +303,20 @@ private SnapshotPendingSplitsState 
deserializeSnapshotPendingSplitsState(
         if (version >= 7) {
             splitFinishedCheckpointIds = readSplitFinishedCheckpointIds(in);
         }
+
+        // The modification of 8th version: add ChunkSplitterState to 
SnapshotPendingSplitsState,
+        // which contains the asynchronously splitting chunk info.
+        TableId splittingTableId = null;
+        Object nextChunkStart = null;
+        Integer nextChunkId = null;
+        if (version >= 8) {
+            boolean hasTableIsSplitting = in.readBoolean();
+            if (hasTableIsSplitting) {
+                splittingTableId = TableId.parse(in.readUTF());
+                nextChunkStart = 
SerializerUtils.serializedStringToRow(in.readUTF())[0];
+                nextChunkId = in.readInt();
+            }
+        }

Review Comment:
   ```suggestion
           ChunkSplitterState chunkSplitterState = 
ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
           if (version >= 8) {
               boolean hasTableIsSplitting = in.readBoolean();
               if (hasTableIsSplitting) {
                   TableId splittingTableId = ...;
                   Object nextChunkStart = ...;
                   Integer nextChunkId = ...;
                   chunkSplitterState = new ChunkSplitterState(
                                   splittingTableId,
                                   
ChunkSplitterState.ChunkBound.middleOf(nextChunkStart),
                                   nextChunkId);
               }
           }
           return new SnapshotPendingSplitsState(..., chunkSplitterState);
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java:
##########
@@ -314,15 +317,29 @@ private List<String> formatResult(List<SourceRecord> 
records, DataType dataType)
     }
 
     private List<SnapshotSplit> getSnapshotSplits(
-            PostgresSourceConfig sourceConfig, JdbcDataSourceDialect 
sourceDialect) {
+            PostgresSourceConfig sourceConfig, JdbcDataSourceDialect 
sourceDialect)
+            throws Exception {
         List<TableId> discoverTables = 
sourceDialect.discoverDataCollections(sourceConfig);
-        final ChunkSplitter chunkSplitter = 
sourceDialect.createChunkSplitter(sourceConfig);
-
+        OffsetFactory offsetFactory = new PostgresOffsetFactory();
+        final SplitAssigner snapshotSplitAssigner =
+                new SnapshotSplitAssigner<JdbcSourceConfig>(
+                        sourceConfig,
+                        DEFAULT_PARALLELISM,
+                        discoverTables,
+                        
sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig),
+                        sourceDialect,
+                        offsetFactory);
+        snapshotSplitAssigner.open();
         List<SnapshotSplit> snapshotSplitList = new ArrayList<>();
-        for (TableId table : discoverTables) {
-            Collection<SnapshotSplit> snapshotSplits = 
chunkSplitter.generateSplits(table);
-            snapshotSplitList.addAll(snapshotSplits);
+        while (true) {
+            Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();
+            if (split.isPresent()) {
+                snapshotSplitList.add(split.get().asSnapshotSplit());
+            } else {
+                break;
+            }

Review Comment:
   ```suggestion
       Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();
       while (split.isPresent()) {
           snapshotSplitList.add(split.get().asSnapshotSplit());
           split = snapshotSplitAssigner.getNext();
       }
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -510,7 +574,28 @@ && allSnapshotSplitsFinished()) {
 
     @Override
     public void close() throws IOException {
-        dialect.close();
+        closeExecutorService();
+        if (chunkSplitter != null) {
+            try {
+                chunkSplitter.close();
+            } catch (Exception e) {
+                LOG.warn("Fail to close the chunk splitter.");
+            }
+        }
+        // dialect.close();

Review Comment:
   Why not close dialect here?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##########
@@ -60,58 +61,102 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
     protected final JdbcSourceConfig sourceConfig;
     protected final JdbcDataSourceDialect dialect;
 
-    public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig, 
JdbcDataSourceDialect dialect) {
+    private final Object lock = new Object();
+    @Nullable private TableId currentSplittingTableId;
+    @Nullable private ChunkSplitterState.ChunkBound nextChunkStart;
+    @Nullable private Integer nextChunkId;
+
+    private JdbcConnection jdbcConnection;
+    private Table currentSplittingTable;
+    private TableChanges.TableChange currentSchema;
+    private Column splitColumn;
+    private RowType splitType;
+    private Object[] minMaxOfSplitColumn;
+    private long approximateRowCnt;
+
+    public JdbcSourceChunkSplitter(
+            JdbcSourceConfig sourceConfig,
+            JdbcDataSourceDialect dialect,
+            ChunkSplitterState chunkSplitterState) {
+        this(
+                sourceConfig,
+                dialect,
+                chunkSplitterState.getCurrentSplittingTableId(),
+                chunkSplitterState.getNextChunkStart(),
+                chunkSplitterState.getNextChunkId());
+    }
+
+    public JdbcSourceChunkSplitter(
+            JdbcSourceConfig sourceConfig,
+            JdbcDataSourceDialect dialect,
+            @Nullable TableId currentSplittingTableId,
+            @Nullable ChunkSplitterState.ChunkBound nextChunkStart,
+            @Nullable Integer nextChunkId) {
         this.sourceConfig = sourceConfig;
         this.dialect = dialect;
+        this.currentSplittingTableId = currentSplittingTableId;
+        this.nextChunkStart = nextChunkStart;
+        this.nextChunkId = nextChunkId;
     }
 
-    /** Generates all snapshot splits (chunks) for the give table path. */
     @Override
-    public Collection<SnapshotSplit> generateSplits(TableId tableId) {
-        try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
-            LOG.info("Start splitting table {} into chunks...", tableId);
-            long start = System.currentTimeMillis();
+    public void open() {
+        this.jdbcConnection = dialect.openJdbcConnection(sourceConfig);
+    }
 
-            Table table =
-                    Objects.requireNonNull(dialect.queryTableSchema(jdbc, 
tableId)).getTable();
-            Column splitColumn = getSplitColumn(table, 
sourceConfig.getChunkKeyColumn());
-            final List<ChunkRange> chunks;
-            try {
-                chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
-            } catch (SQLException e) {
-                throw new FlinkRuntimeException("Failed to split chunks for 
table " + tableId, e);
+    /** Generates all snapshot splits (chunks) for the give table path. */
+    @Override
+    public Collection<SnapshotSplit> generateSplits(TableId tableId) throws 
Exception {
+        if (!hasNextChunk()) {
+            // split a new table.
+            analyzeTable(tableId);
+            Optional<List<SnapshotSplit>> evenlySplitChunks = 
trySplitAllEvenlySizedChunks(tableId);
+            if (evenlySplitChunks.isPresent()) {
+                return evenlySplitChunks.get();
+            } else {
+                synchronized (lock) {
+                    this.currentSplittingTableId = tableId;
+                    this.nextChunkStart = 
ChunkSplitterState.ChunkBound.START_BOUND;
+                    this.nextChunkId = 0;
+                    return 
Collections.singletonList(splitOneUnevenlySizedChunk(tableId));
+                }
             }
-
-            // convert chunks into splits
-            List<SnapshotSplit> splits = new ArrayList<>();
-            RowType splitType = getSplitType(splitColumn);
-            for (int i = 0; i < chunks.size(); i++) {
-                ChunkRange chunk = chunks.get(i);
-                SnapshotSplit split =
-                        createSnapshotSplit(
-                                jdbc,
-                                tableId,
-                                i,
-                                splitType,
-                                chunk.getChunkStart(),
-                                chunk.getChunkEnd());
-                splits.add(split);
+        } else {
+            Preconditions.checkState(
+                    currentSplittingTableId.equals(tableId),
+                    "Can not split a new table before the previous table 
splitting finish.");
+            if (currentSplittingTable == null) {
+                analyzeTable(currentSplittingTableId);
             }
+            synchronized (lock) {
+                return 
Collections.singletonList(splitOneUnevenlySizedChunk(tableId));
+            }
+        }
+    }
 
-            long end = System.currentTimeMillis();
-            LOG.info(
-                    "Split table {} into {} chunks, time cost: {}ms.",
-                    tableId,
-                    splits.size(),
-                    end - start);
-            return splits;
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(
-                    String.format("Generate Splits for table %s error", 
tableId), e);
+    @Override
+    public boolean hasNextChunk() {
+        return currentSplittingTableId != null;
+    }
+
+    @Override
+    public ChunkSplitterState snapshotState(long checkpointId) {
+        // don't chunk new spilt when snapshot state.
+        synchronized (lock) {
+            return new ChunkSplitterState(currentSplittingTableId, 
nextChunkStart, nextChunkId);
         }
     }
 
+    @Override
+    public TableId getCurrentSplittingTableId() {
+        return currentSplittingTableId;
+    }
+
+    @Override
+    public void close() throws Exception {
+        jdbcConnection.close();

Review Comment:
   ```suggestion
   if (jdbcConnection != null) {
           jdbcConnection.close();
   }
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() {
         return numTablesPendingSnapshot;
     }
 
-    @Override
-    public Optional<SourceSplitBase> getNext() {
-        if (!remainingSplits.isEmpty()) {
-            // return remaining splits firstly
-            Iterator<SchemalessSnapshotSplit> iterator = 
remainingSplits.iterator();
-            SchemalessSnapshotSplit split = iterator.next();
-            iterator.remove();
-            assignedSplits.put(split.splitId(), split);
-            enumeratorMetrics
-                    .getTableMetrics(split.getTableId())
-                    .finishProcessSplit(split.splitId());
-            return 
Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
-        } else {
-            // it's turn for new table
-            TableId nextTable = remainingTables.pollFirst();
-            if (nextTable != null) {
-                // split the given table into chunks (snapshot splits)
-                Collection<SnapshotSplit> splits = 
chunkSplitter.generateSplits(nextTable);
-                final Map<TableId, TableChanges.TableChange> tableSchema = new 
HashMap<>();
-                if (!splits.isEmpty()) {
+    private void startAsynchronouslySplit() {
+        if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
+            if (executor == null) {
+                ThreadFactory threadFactory =
+                        new 
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
+                this.executor = 
Executors.newSingleThreadExecutor(threadFactory);
+            }
+            executor.submit(this::splitChunksForRemainingTables);

Review Comment:
   ```suggestion
               try {
                   LOG.info("submit async snapshot-splitting task");
                   
this.splittingExecutorService.submit(this::splitChunksForRemainingTables);
               } catch (RejectedExecutionException e) {
                   LOG.error("submit async snapshot-splitting task error", e);
                   throw new FlinkRuntimeException("submit async 
snapshot-splitting task error", e);
               }
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() {
         return numTablesPendingSnapshot;
     }
 
-    @Override
-    public Optional<SourceSplitBase> getNext() {
-        if (!remainingSplits.isEmpty()) {
-            // return remaining splits firstly
-            Iterator<SchemalessSnapshotSplit> iterator = 
remainingSplits.iterator();
-            SchemalessSnapshotSplit split = iterator.next();
-            iterator.remove();
-            assignedSplits.put(split.splitId(), split);
-            enumeratorMetrics
-                    .getTableMetrics(split.getTableId())
-                    .finishProcessSplit(split.splitId());
-            return 
Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
-        } else {
-            // it's turn for new table
-            TableId nextTable = remainingTables.pollFirst();
-            if (nextTable != null) {
-                // split the given table into chunks (snapshot splits)
-                Collection<SnapshotSplit> splits = 
chunkSplitter.generateSplits(nextTable);
-                final Map<TableId, TableChanges.TableChange> tableSchema = new 
HashMap<>();
-                if (!splits.isEmpty()) {
+    private void startAsynchronouslySplit() {
+        if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
+            if (executor == null) {
+                ThreadFactory threadFactory =
+                        new 
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
+                this.executor = 
Executors.newSingleThreadExecutor(threadFactory);
+            }
+            executor.submit(this::splitChunksForRemainingTables);
+        }
+    }
+
+    private void splitTable(TableId nextTable) {
+        LOG.info("Start splitting table {} into chunks...", nextTable);
+        long start = System.currentTimeMillis();
+        int chunkNum = 0;
+        boolean hasRecordSchema = false;
+        // split the given table into chunks (snapshot splits)
+        do {
+            synchronized (lock) {
+                Collection<SnapshotSplit> splits;
+                try {
+                    splits = chunkSplitter.generateSplits(nextTable);
+                } catch (Exception e) {
+                    throw new IllegalStateException(
+                            "Error when splitting chunks for " + nextTable, e);
+                }
+
+                if (!hasRecordSchema && !splits.isEmpty()) {
+                    hasRecordSchema = true;
+                    final Map<TableId, TableChanges.TableChange> tableSchema = 
new HashMap<>();
                     
tableSchema.putAll(splits.iterator().next().getTableSchemas());
+                    tableSchemas.putAll(tableSchema);
                 }
                 final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
                         splits.stream()
                                 .map(SnapshotSplit::toSchemalessSnapshotSplit)
                                 .collect(Collectors.toList());
+                chunkNum += splits.size();
                 remainingSplits.addAll(schemalessSnapshotSplits);
-                tableSchemas.putAll(tableSchema);
-                if (!alreadyProcessedTables.contains(nextTable)) {
-                    enumeratorMetrics.startSnapshotTables(1);
-                }
-                alreadyProcessedTables.add(nextTable);
                 List<String> splitIds =
                         schemalessSnapshotSplits.stream()
                                 .map(SchemalessSnapshotSplit::splitId)
                                 .collect(Collectors.toList());
                 
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
+
+                if (!chunkSplitter.hasNextChunk()) {
+                    remainingTables.remove(nextTable);
+                }
+                lock.notify();
+            }
+        } while (chunkSplitter.hasNextChunk());
+        long end = System.currentTimeMillis();
+        LOG.info(
+                "Split table {} into {} chunks, time cost: {}ms.",
+                nextTable,
+                chunkNum,
+                end - start);
+    }
+
+    @Override
+    public Optional<SourceSplitBase> getNext() {

Review Comment:
   Do we need `waitTableDiscoveryReady` here or not ?
   
   ```java
       private void waitTableDiscoveryReady() {
           while (needToDiscoveryTables()) {
               LOG.debug("Current assigner is discovering tables, wait tables 
ready...");
               try {
                   Thread.sleep(200);
               } catch (InterruptedException e) {
                   // nothing to do
               }
           }
       }
       /** Indicates current assigner need to discovery tables or not. */
       public boolean needToDiscoveryTables() {
           return remainingTables.isEmpty()
                   && remainingSplits.isEmpty()
                   && alreadyProcessedTables.isEmpty();
       }
   ```



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -451,6 +513,7 @@ public SnapshotPendingSplitsState snapshotState(long 
checkpointId) {
                     splitFinishedCheckpointIds.size());
         }
 
+        Map<String, Long> splitFinishedCheckpointIds1 = 
splitFinishedCheckpointIds;

Review Comment:
   `splitFinishedCheckpointIds1` is never used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to