ruanhang1993 commented on code in PR #3510:
URL: https://github.com/apache/flink-cdc/pull/3510#discussion_r1883350098
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializer.java:
##########
@@ -285,6 +303,20 @@ private SnapshotPendingSplitsState
deserializeSnapshotPendingSplitsState(
if (version >= 7) {
splitFinishedCheckpointIds = readSplitFinishedCheckpointIds(in);
}
+
+ // The modification of 8th version: add ChunkSplitterState to
SnapshotPendingSplitsState,
+ // which contains the asynchronously splitting chunk info.
+ TableId splittingTableId = null;
+ Object nextChunkStart = null;
+ Integer nextChunkId = null;
+ if (version >= 8) {
+ boolean hasTableIsSplitting = in.readBoolean();
+ if (hasTableIsSplitting) {
+ splittingTableId = TableId.parse(in.readUTF());
+ nextChunkStart =
SerializerUtils.serializedStringToRow(in.readUTF())[0];
+ nextChunkId = in.readInt();
+ }
+ }
Review Comment:
```suggestion
ChunkSplitterState chunkSplitterState =
ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
if (version >= 8) {
boolean hasTableIsSplitting = in.readBoolean();
if (hasTableIsSplitting) {
TableId splittingTableId = ...;
Object nextChunkStart = ...;
Integer nextChunkId = ...;
chunkSplitterState = new ChunkSplitterState(
splittingTableId,
ChunkSplitterState.ChunkBound.middleOf(nextChunkStart),
nextChunkId);
}
}
return new SnapshotPendingSplitsState(..., chunkSplitterState);
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java:
##########
@@ -314,15 +317,29 @@ private List<String> formatResult(List<SourceRecord>
records, DataType dataType)
}
private List<SnapshotSplit> getSnapshotSplits(
- PostgresSourceConfig sourceConfig, JdbcDataSourceDialect
sourceDialect) {
+ PostgresSourceConfig sourceConfig, JdbcDataSourceDialect
sourceDialect)
+ throws Exception {
List<TableId> discoverTables =
sourceDialect.discoverDataCollections(sourceConfig);
- final ChunkSplitter chunkSplitter =
sourceDialect.createChunkSplitter(sourceConfig);
-
+ OffsetFactory offsetFactory = new PostgresOffsetFactory();
+ final SplitAssigner snapshotSplitAssigner =
+ new SnapshotSplitAssigner<JdbcSourceConfig>(
+ sourceConfig,
+ DEFAULT_PARALLELISM,
+ discoverTables,
+
sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig),
+ sourceDialect,
+ offsetFactory);
+ snapshotSplitAssigner.open();
List<SnapshotSplit> snapshotSplitList = new ArrayList<>();
- for (TableId table : discoverTables) {
- Collection<SnapshotSplit> snapshotSplits =
chunkSplitter.generateSplits(table);
- snapshotSplitList.addAll(snapshotSplits);
+ while (true) {
+ Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();
+ if (split.isPresent()) {
+ snapshotSplitList.add(split.get().asSnapshotSplit());
+ } else {
+ break;
+ }
Review Comment:
```suggestion
Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();
while (split.isPresent()) {
snapshotSplitList.add(split.get().asSnapshotSplit());
split = snapshotSplitAssigner.getNext();
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -510,7 +574,28 @@ && allSnapshotSplitsFinished()) {
@Override
public void close() throws IOException {
- dialect.close();
+ closeExecutorService();
+ if (chunkSplitter != null) {
+ try {
+ chunkSplitter.close();
+ } catch (Exception e) {
+ LOG.warn("Fail to close the chunk splitter.");
+ }
+ }
+ // dialect.close();
Review Comment:
Why not close dialect here?
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java:
##########
@@ -60,58 +61,102 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
protected final JdbcSourceConfig sourceConfig;
protected final JdbcDataSourceDialect dialect;
- public JdbcSourceChunkSplitter(JdbcSourceConfig sourceConfig,
JdbcDataSourceDialect dialect) {
+ private final Object lock = new Object();
+ @Nullable private TableId currentSplittingTableId;
+ @Nullable private ChunkSplitterState.ChunkBound nextChunkStart;
+ @Nullable private Integer nextChunkId;
+
+ private JdbcConnection jdbcConnection;
+ private Table currentSplittingTable;
+ private TableChanges.TableChange currentSchema;
+ private Column splitColumn;
+ private RowType splitType;
+ private Object[] minMaxOfSplitColumn;
+ private long approximateRowCnt;
+
+ public JdbcSourceChunkSplitter(
+ JdbcSourceConfig sourceConfig,
+ JdbcDataSourceDialect dialect,
+ ChunkSplitterState chunkSplitterState) {
+ this(
+ sourceConfig,
+ dialect,
+ chunkSplitterState.getCurrentSplittingTableId(),
+ chunkSplitterState.getNextChunkStart(),
+ chunkSplitterState.getNextChunkId());
+ }
+
+ public JdbcSourceChunkSplitter(
+ JdbcSourceConfig sourceConfig,
+ JdbcDataSourceDialect dialect,
+ @Nullable TableId currentSplittingTableId,
+ @Nullable ChunkSplitterState.ChunkBound nextChunkStart,
+ @Nullable Integer nextChunkId) {
this.sourceConfig = sourceConfig;
this.dialect = dialect;
+ this.currentSplittingTableId = currentSplittingTableId;
+ this.nextChunkStart = nextChunkStart;
+ this.nextChunkId = nextChunkId;
}
- /** Generates all snapshot splits (chunks) for the give table path. */
@Override
- public Collection<SnapshotSplit> generateSplits(TableId tableId) {
- try (JdbcConnection jdbc = dialect.openJdbcConnection(sourceConfig)) {
-
- LOG.info("Start splitting table {} into chunks...", tableId);
- long start = System.currentTimeMillis();
+ public void open() {
+ this.jdbcConnection = dialect.openJdbcConnection(sourceConfig);
+ }
- Table table =
- Objects.requireNonNull(dialect.queryTableSchema(jdbc,
tableId)).getTable();
- Column splitColumn = getSplitColumn(table,
sourceConfig.getChunkKeyColumn());
- final List<ChunkRange> chunks;
- try {
- chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
- } catch (SQLException e) {
- throw new FlinkRuntimeException("Failed to split chunks for
table " + tableId, e);
+ /** Generates all snapshot splits (chunks) for the give table path. */
+ @Override
+ public Collection<SnapshotSplit> generateSplits(TableId tableId) throws
Exception {
+ if (!hasNextChunk()) {
+ // split a new table.
+ analyzeTable(tableId);
+ Optional<List<SnapshotSplit>> evenlySplitChunks =
trySplitAllEvenlySizedChunks(tableId);
+ if (evenlySplitChunks.isPresent()) {
+ return evenlySplitChunks.get();
+ } else {
+ synchronized (lock) {
+ this.currentSplittingTableId = tableId;
+ this.nextChunkStart =
ChunkSplitterState.ChunkBound.START_BOUND;
+ this.nextChunkId = 0;
+ return
Collections.singletonList(splitOneUnevenlySizedChunk(tableId));
+ }
}
-
- // convert chunks into splits
- List<SnapshotSplit> splits = new ArrayList<>();
- RowType splitType = getSplitType(splitColumn);
- for (int i = 0; i < chunks.size(); i++) {
- ChunkRange chunk = chunks.get(i);
- SnapshotSplit split =
- createSnapshotSplit(
- jdbc,
- tableId,
- i,
- splitType,
- chunk.getChunkStart(),
- chunk.getChunkEnd());
- splits.add(split);
+ } else {
+ Preconditions.checkState(
+ currentSplittingTableId.equals(tableId),
+ "Can not split a new table before the previous table
splitting finish.");
+ if (currentSplittingTable == null) {
+ analyzeTable(currentSplittingTableId);
}
+ synchronized (lock) {
+ return
Collections.singletonList(splitOneUnevenlySizedChunk(tableId));
+ }
+ }
+ }
- long end = System.currentTimeMillis();
- LOG.info(
- "Split table {} into {} chunks, time cost: {}ms.",
- tableId,
- splits.size(),
- end - start);
- return splits;
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format("Generate Splits for table %s error",
tableId), e);
+ @Override
+ public boolean hasNextChunk() {
+ return currentSplittingTableId != null;
+ }
+
+ @Override
+ public ChunkSplitterState snapshotState(long checkpointId) {
+ // don't chunk new spilt when snapshot state.
+ synchronized (lock) {
+ return new ChunkSplitterState(currentSplittingTableId,
nextChunkStart, nextChunkId);
}
}
+ @Override
+ public TableId getCurrentSplittingTableId() {
+ return currentSplittingTableId;
+ }
+
+ @Override
+ public void close() throws Exception {
+ jdbcConnection.close();
Review Comment:
```suggestion
if (jdbcConnection != null) {
jdbcConnection.close();
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() {
return numTablesPendingSnapshot;
}
- @Override
- public Optional<SourceSplitBase> getNext() {
- if (!remainingSplits.isEmpty()) {
- // return remaining splits firstly
- Iterator<SchemalessSnapshotSplit> iterator =
remainingSplits.iterator();
- SchemalessSnapshotSplit split = iterator.next();
- iterator.remove();
- assignedSplits.put(split.splitId(), split);
- enumeratorMetrics
- .getTableMetrics(split.getTableId())
- .finishProcessSplit(split.splitId());
- return
Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
- } else {
- // it's turn for new table
- TableId nextTable = remainingTables.pollFirst();
- if (nextTable != null) {
- // split the given table into chunks (snapshot splits)
- Collection<SnapshotSplit> splits =
chunkSplitter.generateSplits(nextTable);
- final Map<TableId, TableChanges.TableChange> tableSchema = new
HashMap<>();
- if (!splits.isEmpty()) {
+ private void startAsynchronouslySplit() {
+ if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
+ if (executor == null) {
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
+ this.executor =
Executors.newSingleThreadExecutor(threadFactory);
+ }
+ executor.submit(this::splitChunksForRemainingTables);
Review Comment:
```suggestion
try {
LOG.info("submit async snapshot-splitting task");
this.splittingExecutorService.submit(this::splitChunksForRemainingTables);
} catch (RejectedExecutionException e) {
LOG.error("submit async snapshot-splitting task error", e);
throw new FlinkRuntimeException("submit async
snapshot-splitting task error", e);
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -319,45 +335,91 @@ private int computeTablesPendingSnapshot() {
return numTablesPendingSnapshot;
}
- @Override
- public Optional<SourceSplitBase> getNext() {
- if (!remainingSplits.isEmpty()) {
- // return remaining splits firstly
- Iterator<SchemalessSnapshotSplit> iterator =
remainingSplits.iterator();
- SchemalessSnapshotSplit split = iterator.next();
- iterator.remove();
- assignedSplits.put(split.splitId(), split);
- enumeratorMetrics
- .getTableMetrics(split.getTableId())
- .finishProcessSplit(split.splitId());
- return
Optional.of(split.toSnapshotSplit(tableSchemas.get(split.getTableId())));
- } else {
- // it's turn for new table
- TableId nextTable = remainingTables.pollFirst();
- if (nextTable != null) {
- // split the given table into chunks (snapshot splits)
- Collection<SnapshotSplit> splits =
chunkSplitter.generateSplits(nextTable);
- final Map<TableId, TableChanges.TableChange> tableSchema = new
HashMap<>();
- if (!splits.isEmpty()) {
+ private void startAsynchronouslySplit() {
+ if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
+ if (executor == null) {
+ ThreadFactory threadFactory =
+ new
ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
+ this.executor =
Executors.newSingleThreadExecutor(threadFactory);
+ }
+ executor.submit(this::splitChunksForRemainingTables);
+ }
+ }
+
+ private void splitTable(TableId nextTable) {
+ LOG.info("Start splitting table {} into chunks...", nextTable);
+ long start = System.currentTimeMillis();
+ int chunkNum = 0;
+ boolean hasRecordSchema = false;
+ // split the given table into chunks (snapshot splits)
+ do {
+ synchronized (lock) {
+ Collection<SnapshotSplit> splits;
+ try {
+ splits = chunkSplitter.generateSplits(nextTable);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ "Error when splitting chunks for " + nextTable, e);
+ }
+
+ if (!hasRecordSchema && !splits.isEmpty()) {
+ hasRecordSchema = true;
+ final Map<TableId, TableChanges.TableChange> tableSchema =
new HashMap<>();
tableSchema.putAll(splits.iterator().next().getTableSchemas());
+ tableSchemas.putAll(tableSchema);
}
final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
splits.stream()
.map(SnapshotSplit::toSchemalessSnapshotSplit)
.collect(Collectors.toList());
+ chunkNum += splits.size();
remainingSplits.addAll(schemalessSnapshotSplits);
- tableSchemas.putAll(tableSchema);
- if (!alreadyProcessedTables.contains(nextTable)) {
- enumeratorMetrics.startSnapshotTables(1);
- }
- alreadyProcessedTables.add(nextTable);
List<String> splitIds =
schemalessSnapshotSplits.stream()
.map(SchemalessSnapshotSplit::splitId)
.collect(Collectors.toList());
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
+
+ if (!chunkSplitter.hasNextChunk()) {
+ remainingTables.remove(nextTable);
+ }
+ lock.notify();
+ }
+ } while (chunkSplitter.hasNextChunk());
+ long end = System.currentTimeMillis();
+ LOG.info(
+ "Split table {} into {} chunks, time cost: {}ms.",
+ nextTable,
+ chunkNum,
+ end - start);
+ }
+
+ @Override
+ public Optional<SourceSplitBase> getNext() {
Review Comment:
Do we need `waitTableDiscoveryReady` here or not ?
```java
private void waitTableDiscoveryReady() {
while (needToDiscoveryTables()) {
LOG.debug("Current assigner is discovering tables, wait tables
ready...");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
// nothing to do
}
}
}
/** Indicates current assigner need to discovery tables or not. */
public boolean needToDiscoveryTables() {
return remainingTables.isEmpty()
&& remainingSplits.isEmpty()
&& alreadyProcessedTables.isEmpty();
}
```
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##########
@@ -451,6 +513,7 @@ public SnapshotPendingSplitsState snapshotState(long
checkpointId) {
splitFinishedCheckpointIds.size());
}
+ Map<String, Long> splitFinishedCheckpointIds1 =
splitFinishedCheckpointIds;
Review Comment:
`splitFinishedCheckpointIds1` is never used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]