github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3225908525


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -382,10 +388,13 @@ private void initInsertJob() {
             this.originTvfProps = currentTvf.getProperties().getMap();
             this.offsetProvider = createOffsetProvider(sourceProperties);
             this.offsetProvider.ensureInitialized(getJobId(), originTvfProps);
+            // cdc_stream TVF has TABLE; S3 TVF doesn't, so syncTables stays 
null there.
+            String tvfTable = originTvfProps.get(DataSourceConfigKeys.TABLE);
+            this.syncTables = tvfTable == null ? null : 
Collections.singletonList(tvfTable);
             // Validate source-side resources (e.g. PG slot/publication 
ownership) once at job
             // creation so conflicts fail fast. No-op for standalone 
cdc_stream TVF (no job).
             StreamingJobUtils.validateTvfSource(tvfType, originTvfProps, 
String.valueOf(getJobId()));
-            this.offsetProvider.initOnCreate();
+            this.offsetProvider.initOnCreate(this.syncTables);
             // validate offset props, only for s3 cause s3 tvf no offset prop

Review Comment:
   This change makes `cdc_stream` TVF jobs use the inherited scheduler-driven 
split state, but `JdbcTvfSourceOffsetProvider.updateOffset()` still mutates the 
inherited `remainingSplits`, `finishedSplits`, and `chunkHighWatermarkMap` 
without taking `splitsLock`. The scheduler can call `advanceSplitsIfNeed()` on 
the job thread while task commit calls `updateOffset()` on the commit path; 
inherited methods like `advanceSplits()`, `getNextOffset()`, 
`hasMoreDataToConsume()`, and `hasReachedEnd()` all assume those split 
collections are protected by `splitsLock`. For TVF jobs this can race with 
`advanceSplits()` iterating/adding to the same lists, causing inconsistent 
split progress or `ConcurrentModificationException`. Please make the TVF 
override follow the same locking discipline as the base `updateOffset()` (and 
keep the high-watermark update inside that critical section or otherwise use a 
consistent snapshot).



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +136,97 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
ChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC builds a fresh splitter from the (table, 
nextChunkStart, nextChunkId)
+     * triple supplied by FE, fetches up to {@code batchSize} chunks, then 
closes the splitter.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest 
modes never reach here.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info(
+                "Get table {} splits for job {} (nextSplitId={}, 
nextSplitStart={})",
+                ftsReq.getSnapshotTable(),
+                ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                java.util.Arrays.toString(ftsReq.getNextSplitStart()));
         JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
-                remainingSnapshotSplits = new ArrayList<>();
-        StreamSplit remainingStreamSplit = null;
-
-        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
-        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
-                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
+        String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+        TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+        // Build ChunkSplitterState from FE-side (nextSplitStart, nextSplitId).
+        //   null -> NO_SPLITTING_TABLE_STATE so splitter analyzes the table 
and may pick evenly path.
+        //   non-null -> resume mid-table, forced into unevenly path.
+        Object[] pkValues = ftsReq.getNextSplitStart();
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+        ChunkSplitterState state;
+        if (pkValues == null || pkValues.length == 0) {
+            state = ChunkSplitterState.NO_SPLITTING_TABLE_STATE;
         } else {
-            // For non-initial mode, create a stream split
-            Offset startingOffset = createInitialOffset();
-            remainingStreamSplit =
-                    new StreamSplit(
-                            STREAM_SPLIT_ID,
-                            startingOffset,
-                            createNoStoppingOffset(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
+            // Restore the original JDBC type (JSON downgrades Long to 
Integer).
+            int sqlType = resolveSplitKeySqlType(sourceConfig, tableId);
+            Object castStart = SplitKeyTypeResolver.cast(pkValues[0], sqlType);
+            int splitId = ftsReq.getNextSplitId() == null ? 0 : 
ftsReq.getNextSplitId();
+            state = new ChunkSplitterState(tableId, 
ChunkBound.middleOf(castStart), splitId);
         }
+        ChunkSplitter splitter = 
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
 
-        List<AbstractSourceSplit> splits = new ArrayList<>();
-        if (!remainingSnapshotSplits.isEmpty()) {
-            for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit
-                    snapshotSplit : remainingSnapshotSplits) {
-                String splitId = snapshotSplit.splitId();
-                String tableId = snapshotSplit.getTableId().identifier();
-                Object[] splitStart = snapshotSplit.getSplitStart();
-                Object[] splitEnd = snapshotSplit.getSplitEnd();
-                List<String> splitKey = 
snapshotSplit.getSplitKeyType().getFieldNames();
-                SnapshotSplit split =
-                        new SnapshotSplit(splitId, tableId, splitKey, 
splitStart, splitEnd, null);
-                splits.add(split);
+        try {
+            splitter.open();
+            List<AbstractSourceSplit> result = new ArrayList<>();
+            while (result.size() < batchSize) {
+                
Collection<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
+                        chunks = splitter.generateSplits(tableId);
+                for 
(org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit chunk : 
chunks) {
+                    result.add(toDorisSnapshotSplit(chunk));
+                }
+                if (!splitter.hasNextChunk()) {
+                    break;
+                }
+            }
+            LOG.info("Fetched {} splits for table {} (resume nextSplitId={}); 
hasNextChunk={}",
+                    result.size(), tableId, ftsReq.getNextSplitId(), 
splitter.hasNextChunk());
+            return result;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to generate splits for " + 
tableId, e);
+        } finally {
+            try {
+                splitter.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close splitter for {}", tableId, e);
             }
-        } else {
-            Offset startingOffset = remainingStreamSplit.getStartingOffset();
-            BinlogSplit streamSplit = new BinlogSplit();
-            streamSplit.setSplitId(remainingStreamSplit.splitId());
-            streamSplit.setStartingOffset(startingOffset.getOffset());
-            splits.add(streamSplit);
         }
-        return splits;
+    }
+
+    /** Resolve and cache the split key column's JDBC type. */
+    private int resolveSplitKeySqlType(JdbcSourceConfig sourceConfig, TableId 
tableId) {
+        String cacheKey =
+                sourceConfig.getHostname() + ":" + sourceConfig.getPort() + 
"|" + tableId;

Review Comment:
   The split-key SQL type cache key only includes `host:port|tableId`, but for 
the generic JDBC/Postgres path `tableId` is built from `schema + table` and 
does not include the database/JDBC URL (or the configured chunk key column). If 
two CDC jobs point at different databases on the same host/port with the same 
schema/table but different split-key types, the second job will reuse the first 
job's cached JDBC type and cast `nextSplitStart` incorrectly; flink-cdc then 
compares the wrong Java type and can return empty or overlapping chunks during 
resume. Please include a source identity that distinguishes the JDBC 
database/URL and chunk key column in the cache key, or make the cache scoped 
per source config/job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to