Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3216293298


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /** Initialize at CREATE time. syncTables is the list of source tables 
this job syncs. */
+    @Override
+    public void initSplitProgress(List<String> syncTables) {
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }

Review Comment:
   JdbcSourceOffsetProvider.initSplitProgress()/noMoreSplits()/advanceSplits() 
currently drive snapshot split fetching for any job with non-empty syncTables, 
regardless of startup offset mode. This will incorrectly trigger snapshot 
splitting for OFFSET_LATEST/OFFSET_EARLIEST/JSON specific-offset jobs (default 
is latest), which should be binlog-only and previously initialized the source 
reader instead of fetching snapshot splits. Suggest: if 
!checkNeedSplitChunks(sourceProperties), make initSplitProgress/advanceSplits a 
no-op (and ensure noMoreSplits returns true), and preserve the old latest-mode 
behavior by calling initSourceReader() once.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -246,8 +252,8 @@ private void initSourceJob() {
             StreamingJobUtils.resolveAndValidateSource(
                     dataSourceType, sourceProperties, 
String.valueOf(getJobId()), createTbls);
             this.offsetProvider = 
createOffsetProvider(getConvertedSourceProperties());
-            JdbcSourceOffsetProvider rdsOffsetProvider = 
(JdbcSourceOffsetProvider) this.offsetProvider;
-            rdsOffsetProvider.splitChunks(createTbls);
+            // Initialize split progress; advanceSplits is driven later by the 
scheduler each tick.
+            this.offsetProvider.initSplitProgress(this.syncTables);
         } catch (Exception ex) {

Review Comment:
   initSourceJob() now always calls 
offsetProvider.initSplitProgress(syncTables) and relies on scheduler-driven 
advanceSplits(), but this changes behavior for non-snapshot startup modes (e.g. 
default OFFSET_LATEST). Previously splitChunks() would skip snapshot splitting 
and call initSourceReader() for latest mode; with the current flow, scheduler 
ticks can start fetching snapshot splits for latest/binlog-only jobs (or fail 
because reader init was skipped). Consider gating split-progress 
initialization/scheduler split-advancing to INITIAL/SNAPSHOT modes (or making 
the JdbcSourceOffsetProvider implementation a no-op for non-splitting modes 
while still initializing the reader for latest).



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /** Initialize at CREATE time. syncTables is the list of source tables 
this job syncs. */
+    @Override
+    public void initSplitProgress(List<String> syncTables) {
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
+        }
+    }
+
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(s.getTableId());
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(s.getTableId());
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            touched.add(cdcSplitProgress.getCurrentSplittingTable());
+        }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void advanceSplits() throws JobException {
+        synchronized (splitsLock) {
+            // 1. Pick next table if not currently splitting one.
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            String tbl = cdcSplitProgress.getCurrentSplittingTable();
+            Object[] startVal = cdcSplitProgress.getNextSplitStart();
+            Integer splitId = cdcSplitProgress.getNextSplitId();
+
+            // 2. RPC. OK to hold lock during RPC: no async splitting thread; 
updateOffset on
+            //    task commit waits briefly (max_interval >> RPC time anyway).
+            List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+            if (batch == null || batch.isEmpty()) {
+                return;
+            }
+
+            // 3. mergeBySplitId (defensive dedup).
+            Set<String> existingIds = new HashSet<>();
+            finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            List<SnapshotSplit> newSplits = new ArrayList<>();
+            for (SnapshotSplit s : batch) {
+                if (!existingIds.contains(s.getSplitId())) {
+                    newSplits.add(s);
+                }
+            }
+            remainingSplits.addAll(newSplits);
+
+            // 4. UPSERT this table's full chunk_list to system table.
+            List<SnapshotSplit> splitsOfTbl = Stream.concat(
+                            finishedSplits.stream(), remainingSplits.stream())
+                    .filter(s -> tbl.equals(s.getTableId()))
+                    .sorted(Comparator.comparingInt(s -> 
splitIdOf(s.getSplitId())))
+                    .collect(Collectors.toList());
+            try {
+                StreamingJobUtils.upsertChunkList(getJobId(), tbl, 
splitsOfTbl);
+            } catch (Exception e) {
+                throw new JobException("UPSERT chunk_list failed for " + tbl + 
": " + e.getMessage());
+            }
+
+            // 5. Advance cdcSplitProgress to the last split in the batch.
+            applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 
1));
+        }
+    }
+
+    /** Parse the trailing integer id from flink-cdc splitId format 
"tableId:id". */
+    private static int splitIdOf(String splitId) {
+        int colon = splitId.lastIndexOf(':');
+        return Integer.parseInt(splitId.substring(colon + 1));
+    }

Review Comment:
   splitIdOf() assumes the flink-cdc snapshot splitId is always in the form 
"tableId:id" and will throw StringIndexOutOfBounds/NumberFormatException 
otherwise. Since this is called during replay + progress advancement, a 
malformed/changed splitId format would crash the scheduler tick. Consider 
validating the presence of ':' and parsing failures, and surfacing a 
JobException with a clear message (including the offending splitId) instead of 
letting runtime exceptions propagate.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +132,78 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
ChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC builds a fresh splitter from the (table, 
nextChunkStart, nextChunkId)
+     * triple supplied by FE, fetches up to {@code batchSize} chunks, then 
closes the splitter.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest 
modes never reach here.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info("Get table {} splits for job {} (nextSplitId={}, 
hasNextSplitStart={})",
+                ftsReq.getSnapshotTable(), ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                ftsReq.getNextSplitStart() != null);
         JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
-                remainingSnapshotSplits = new ArrayList<>();
-        StreamSplit remainingStreamSplit = null;
-
-        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
-        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
-                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            // For non-initial mode, create a stream split
-            Offset startingOffset = createInitialOffset();
-            remainingStreamSplit =
-                    new StreamSplit(
-                            STREAM_SPLIT_ID,
-                            startingOffset,
-                            createNoStoppingOffset(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
-        }
+        String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+        TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+        // Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart.
+        //   null / empty -> START_BOUND (first time splitting this table, 
start from min(pk))
+        //   non-empty    -> middleOf(value[0]) (continue from previous 
split's splitEnd)
+        // END_BOUND never appears in transit: when a table is fully split, FE 
clears
+        // committedSplitProgress.currentSplittingTable to null, so the next 
RPC will be for
+        // a different table or no RPC at all.
+        Object[] pkValues = ftsReq.getNextSplitStart();
+        ChunkBound bound = (pkValues == null || pkValues.length == 0)
+                ? ChunkBound.START_BOUND
+                : ChunkBound.middleOf(pkValues[0]);

Review Comment:
   ChunkBound reconstruction uses nextSplitStart[0] only 
(ChunkBound.middleOf(pkValues[0])). If nextSplitStart ever contains multiple PK 
values (e.g. composite key or future multi-column split key support), this will 
silently ignore the remaining components and can resume from an incorrect 
position. Consider validating pkValues length and failing fast with a clear 
error when it is not the expected shape, rather than implicitly dropping values.
   



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -133,54 +132,78 @@ public void initialize(String jobId, DataSource 
dataSource, Map<String, String>
         LOG.info("Initialized poll executor with parallelism: {}", 
parallelism);
     }
 
+    /**
+     * Fetch a batch of snapshot splits by driving flink-cdc {@link 
ChunkSplitter} directly.
+     *
+     * <p>Stateless: each RPC builds a fresh splitter from the (table, 
nextChunkStart, nextChunkId)
+     * triple supplied by FE, fetches up to {@code batchSize} chunks, then 
closes the splitter.
+     *
+     * <p>Only INITIAL/SNAPSHOT startup modes call this RPC; binlog/latest 
modes never reach here.
+     */
     @Override
     public List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest 
ftsReq) {
-        LOG.info("Get table {} splits for job {}", ftsReq.getSnapshotTable(), 
ftsReq.getJobId());
+        LOG.info("Get table {} splits for job {} (nextSplitId={}, 
hasNextSplitStart={})",
+                ftsReq.getSnapshotTable(), ftsReq.getJobId(),
+                ftsReq.getNextSplitId(),
+                ftsReq.getNextSplitStart() != null);
         JdbcSourceConfig sourceConfig = getSourceConfig(ftsReq);
-        
List<org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit>
-                remainingSnapshotSplits = new ArrayList<>();
-        StreamSplit remainingStreamSplit = null;
-
-        // Check startup mode - for PostgreSQL, we use similar logic as MySQL
-        String startupMode = 
ftsReq.getConfig().get(DataSourceConfigKeys.OFFSET);
-        if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)
-                || 
DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startupMode)) {
-            remainingSnapshotSplits =
-                    startSplitChunks(sourceConfig, ftsReq.getSnapshotTable(), 
ftsReq.getConfig());
-        } else {
-            // For non-initial mode, create a stream split
-            Offset startingOffset = createInitialOffset();
-            remainingStreamSplit =
-                    new StreamSplit(
-                            STREAM_SPLIT_ID,
-                            startingOffset,
-                            createNoStoppingOffset(),
-                            new ArrayList<>(),
-                            new HashMap<>(),
-                            0);
-        }
+        String schema = ftsReq.getConfig().get(DataSourceConfigKeys.SCHEMA);
+        TableId tableId = new TableId(null, schema, ftsReq.getSnapshotTable());
+
+        // Reconstruct flink-cdc ChunkBound from FE-side nextSplitStart.
+        //   null / empty -> START_BOUND (first time splitting this table, 
start from min(pk))
+        //   non-empty    -> middleOf(value[0]) (continue from previous 
split's splitEnd)
+        // END_BOUND never appears in transit: when a table is fully split, FE 
clears
+        // committedSplitProgress.currentSplittingTable to null, so the next 
RPC will be for
+        // a different table or no RPC at all.
+        Object[] pkValues = ftsReq.getNextSplitStart();
+        ChunkBound bound = (pkValues == null || pkValues.length == 0)
+                ? ChunkBound.START_BOUND
+                : ChunkBound.middleOf(pkValues[0]);
+
+        int splitId = ftsReq.getNextSplitId() == null ? 0 : 
ftsReq.getNextSplitId();
+        int batchSize = ftsReq.getBatchSize() == null ? 100 : 
ftsReq.getBatchSize();
+
+        ChunkSplitterState state = new ChunkSplitterState(tableId, bound, 
splitId);
+        ChunkSplitter splitter = 
getDialect(sourceConfig).createChunkSplitter(sourceConfig, state);
+        splitter.open();

Review Comment:
   getSourceSplits() now always reconstructs a flink-cdc ChunkSplitter and 
generates snapshot splits, but the previous implementation returned a 
StreamSplit/BinlogSplit for non INITIAL/SNAPSHOT startup modes. If FE ever 
calls /api/fetchSplits for OFFSET_LATEST/OFFSET_EARLIEST/JSON specific offsets 
(including via regressions), this will force an unintended snapshot phase. 
Please reintroduce explicit startup-mode handling here (generate snapshot 
splits only for INITIAL/SNAPSHOT; otherwise return a StreamSplit/BinlogSplit or 
fail fast with a clear error).



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /** Initialize at CREATE time. syncTables is the list of source tables 
this job syncs. */
+    @Override
+    public void initSplitProgress(List<String> syncTables) {
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
+        }
+    }
+
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(s.getTableId());
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(s.getTableId());
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            touched.add(cdcSplitProgress.getCurrentSplittingTable());
+        }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void advanceSplits() throws JobException {
+        synchronized (splitsLock) {
+            // 1. Pick next table if not currently splitting one.
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            String tbl = cdcSplitProgress.getCurrentSplittingTable();
+            Object[] startVal = cdcSplitProgress.getNextSplitStart();
+            Integer splitId = cdcSplitProgress.getNextSplitId();
+
+            // 2. RPC. OK to hold lock during RPC: no async splitting thread; 
updateOffset on
+            //    task commit waits briefly (max_interval >> RPC time anyway).
+            List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+            if (batch == null || batch.isEmpty()) {
+                return;
+            }
+
+            // 3. mergeBySplitId (defensive dedup).
+            Set<String> existingIds = new HashSet<>();
+            finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            List<SnapshotSplit> newSplits = new ArrayList<>();
+            for (SnapshotSplit s : batch) {
+                if (!existingIds.contains(s.getSplitId())) {
+                    newSplits.add(s);
+                }
+            }
+            remainingSplits.addAll(newSplits);
+
+            // 4. UPSERT this table's full chunk_list to system table.
+            List<SnapshotSplit> splitsOfTbl = Stream.concat(
+                            finishedSplits.stream(), remainingSplits.stream())
+                    .filter(s -> tbl.equals(s.getTableId()))
+                    .sorted(Comparator.comparingInt(s -> 
splitIdOf(s.getSplitId())))
+                    .collect(Collectors.toList());
+            try {
+                StreamingJobUtils.upsertChunkList(getJobId(), tbl, 
splitsOfTbl);
+            } catch (Exception e) {
+                throw new JobException("UPSERT chunk_list failed for " + tbl + 
": " + e.getMessage());
+            }
+
+            // 5. Advance cdcSplitProgress to the last split in the batch.
+            applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 
1));
+        }
+    }
+

Review Comment:
   advanceSplits() holds splitsLock while performing rpcFetchSplitsBatch() 
(BRPC) and while UPSERTing chunk_list into the internal meta table. Both 
operations can block for up to the heavy RPC timeout / query latency, which can 
stall updateOffset() (task commit) and other scheduler logic that also 
synchronizes on splitsLock. Consider releasing the lock around the RPC + 
meta-table write: snapshot the current (table,start,id) under lock, perform 
remote work unlocked, then re-lock to merge results if progress is unchanged 
(or use a small state version/compare-and-swap) to avoid long lock holds.
   



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,6 +618,164 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /** Initialize at CREATE time. syncTables is the list of source tables 
this job syncs. */
+    @Override
+    public void initSplitProgress(List<String> syncTables) {
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
+        }
+    }
+
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(s.getTableId());
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(s.getTableId());
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            touched.add(cdcSplitProgress.getCurrentSplittingTable());
+        }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(t)) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void advanceSplits() throws JobException {
+        synchronized (splitsLock) {
+            // 1. Pick next table if not currently splitting one.
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            String tbl = cdcSplitProgress.getCurrentSplittingTable();
+            Object[] startVal = cdcSplitProgress.getNextSplitStart();
+            Integer splitId = cdcSplitProgress.getNextSplitId();
+
+            // 2. RPC. OK to hold lock during RPC: no async splitting thread; 
updateOffset on
+            //    task commit waits briefly (max_interval >> RPC time anyway).
+            List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+            if (batch == null || batch.isEmpty()) {
+                return;
+            }
+
+            // 3. mergeBySplitId (defensive dedup).
+            Set<String> existingIds = new HashSet<>();
+            finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            List<SnapshotSplit> newSplits = new ArrayList<>();
+            for (SnapshotSplit s : batch) {
+                if (!existingIds.contains(s.getSplitId())) {
+                    newSplits.add(s);
+                }
+            }
+            remainingSplits.addAll(newSplits);
+
+            // 4. UPSERT this table's full chunk_list to system table.
+            List<SnapshotSplit> splitsOfTbl = Stream.concat(
+                            finishedSplits.stream(), remainingSplits.stream())
+                    .filter(s -> tbl.equals(s.getTableId()))
+                    .sorted(Comparator.comparingInt(s -> 
splitIdOf(s.getSplitId())))
+                    .collect(Collectors.toList());
+            try {

Review Comment:
   advanceSplits() rebuilds existingIds and re-materializes + sorts the full 
per-table split list on every tick (Stream.concat(finished, remaining) -> 
filter -> sort -> collect). For large tables with many splits, this becomes 
O(total_splits_per_table log N) per batch and can be a noticeable scheduler 
overhead. Consider maintaining a per-table ordered list/map incrementally 
(append-only by splitId) so each tick only processes the new batch, or at least 
avoid re-sorting the full list every time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to