github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3225908525
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -382,10 +388,13 @@ private void initInsertJob() {
this.originTvfProps = currentTvf.getProperties().getMap();
this.offsetProvider = createOffsetProvider(sourceProperties);
this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
+ // cdc_stream TVF has TABLE; S3 TVF doesn't, so syncTables stays
null there.
+ String tvfTable = originTvfProps.get(DataSourceConfigKeys.TABLE);
+ this.syncTables = tvfTable == null ? null :
Collections.singletonList(tvfTable);
// Validate source-side resources (e.g. PG slot/publication
ownership) once at job
// creation so conflicts fail fast. No-op for standalone
cdc_stream TVF (no job).
StreamingJobUtils.validateTvfSource(tvfType, originTvfProps,
String.valueOf(getJobId()));
- this.offsetProvider.initOnCreate();
+ this.offsetProvider.initOnCreate(this.syncTables);
// validate offset props, only for s3 cause s3 tvf no offset prop
Review Comment:
This change makes `cdc_stream` TVF jobs use the inherited scheduler-driven
split state, but `JdbcTvfSourceOffsetProvider.updateOffset()` still mutates the
inherited `remainingSplits`, `finishedSplits`, and `chunkHighWatermarkMap`
without taking `splitsLock`. The scheduler can call `advanceSplitsIfNeed()` on
the job thread while task commit calls `updateOffset()` on the commit path;
inherited methods like `advanceSplits()`, `getNextOffset()`,
`hasMoreDataToConsume()`, and `hasReachedEnd()` all assume those split
collections are protected by `splitsLock`. For TVF jobs this can race with
`advanceSplits()` iterating/adding to the same lists, causing inconsistent
split progress or `ConcurrentModificationException`. Please make the TVF
override follow the same locking discipline as the base `updateOffset()` (and
keep the high-watermark update inside that critical section or otherwise use a
consistent snapshot).
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +136,97 @@ public void initialize(String jobId, DataSource
dataSource, Map<String, String>
LOG.info("Initialized poll executor with parallelism: {}",
parallelism);
}
+ /**
+ * Fetch a batch of snapshot splits by driving flink-cdc {@link
ChunkSplitter} directly.
+ *
+ * <p>Stateless: each RPC builds a fresh splitter from the (table,
nextChunkStart, nextChunkId)
+ * triple supplied by FE, fetches up to {@code batchSize} chunks, then
closes the splitter.
+ *
+ * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest
modes never reach here.
+ */
@Override
public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest
ftsReq) {
- LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(),
ftsReq.getJobId());
+ LOG.info(
+ "Get table {} splits for job {} (nextSplitId={},
nextSplitStart={})",
+ ftsReq.getSnapshotTable(),
+ ftsReq.getJobId(),
+ ftsReq.getNextSplitId(),
+ java.util.Arrays.toString(ftsReq.getNextSplitStart()));
JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
- remainingSnapshotSplits = new ArrayList<>();
- StreamSplit remainingStreamSplit = null;
-
- // Check startup mode - for PostgreSQL, we use similar logic as MySQL
- String startupMode =
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
- if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
- ||
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
- remainingSnapshotSplits =
- startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(),
ftsReq.getConfig());
+ String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+ TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+ // Build ChunkSplitterState from FE-side (nextSplitStart, nextSplitId).
+ // null -> NO_SPLITTING_TABLE_STATE so splitter analyzes the table
and may pick evenly path.
+ // non-null -> resume mid-table, forced into unevenly path.
+ Object[] pkValues = ftsReq.getNextSplitStart();
+ int batchSize = ftsReq.getBatchSize() == null ? 100 :
ftsReq.getBatchSize();
+ ChunkSplitterState state;
+ if (pkValues == null || pkValues.length == 0) {
+ state = ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
} else {
- // For non-initial mode, create a stream split
- Offset startingOffset = createInitialOffset();
- remainingStreamSplit =
- new StreamSplit(
- STREAM_SPLIT_ID,
- startingOffset,
- createNoStoppingOffset(),
- new ArrayList<>(),
- new HashMap<>(),
- 0);
+ // Restore the original JDBC type (JSON downgrades Long to
Integer).
+ int sqlType = resolveSplitKeySqlType(sourceConfig, tableId);
+ Object castStart = SplitKeyTypeResolver.cast(pkValues[0], sqlType);
+ int splitId = ftsReq.getNextSplitId() == null ? 0 :
ftsReq.getNextSplitId();
+ state = new ChunkSplitterState(tableId,
ChunkBound.middleOf(castStart), splitId);
}
+ ChunkSplitter splitter =
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
- List<AbstractSourceSplit> splits = new ArrayList<>();
- if (!remainingSnapshotSplits.isEmpty()) {
- for
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
- snapshotSplit : remainingSnapshotSplits) {
- String splitId = snapshotSplit.splitId();
- String tableId = snapshotSplit.getTableId().identifier();
- Object[] splitStart = snapshotSplit.getSplitStart();
- Object[] splitEnd = snapshotSplit.getSplitEnd();
- List<String> splitKey =
snapshotSplit.getSplitKeyType().getFieldNames();
- SnapshotSplit split =
- new SnapshotSplit(splitId, tableId, splitKey,
splitStart, splitEnd, null);
- splits.add(split);
+ try {
+ splitter.open();
+ List<AbstractSourceSplit> result = new ArrayList<>();
+ while (result.size() < batchSize) {
+
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+ chunks = splitter.generateSplits(tableId);
+ for
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk :
chunks) {
+ result.add(toDorisSnapshotSplit(chunk));
+ }
+ if (!splitter.hasNextChunk()) {
+ break;
+ }
+ }
+ LOG.info("Fetched {} splits for table {} (resume nextSplitId={});
hasNextChunk={}",
+ result.size(), tableId, ftsReq.getNextSplitId(),
splitter.hasNextChunk());
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to generate splits for " +
tableId, e);
+ } finally {
+ try {
+ splitter.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close splitter for {}", tableId, e);
}
- } else {
- Offset startingOffset = remainingStreamSplit.getStartingOffset();
- BinlogSplit streamSplit = new BinlogSplit();
- streamSplit.setSplitId(remainingStreamSplit.splitId());
- streamSplit.setStartingOffset(startingOffset.getOffset());
- splits.add(streamSplit);
}
- return splits;
+ }
+
+ /** Resolve and cache the split key column's JDBC type. */
+ private int resolveSplitKeySqlType(JdbcSourceConfig sourceConfig, TableId
tableId) {
+ String cacheKey =
+ sourceConfig.getHostname() + ":" + sourceConfig.getPort() +
"|" + tableId;
Review Comment:
The split-key SQL type cache key only includes `host:port|tableId`, but for
the generic JDBC/Postgres path `tableId` is built from `schema + table` and
does not include the database/JDBC URL (or the configured chunk key column). If
two CDC jobs point at different databases on the same host/port with the same
schema/table but different split-key types, the second job will reuse the first
job's cached JDBC type and cast `nextSplitStart` incorrectly; flink-cdc then
compares the wrong Java type and can return empty or overlapping chunks during
resume. Please include a source identity that distinguishes the JDBC
database/URL and chunk key column in the cache key, or make the cache scoped
per source config/job.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]