Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3238878651


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +631,238 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
-    public void splitChunks(List<String> createTbls) throws JobException {
-        // todo: When splitting takes a long time, it needs to be changed to 
asynchronous.
-        if (checkNeedSplitChunks(sourceProperties)) {
-            Map<String, List<SnapshotSplit>> tableSplits = new 
LinkedHashMap<>();
-            for (String tbl : createTbls) {
-                List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
-                tableSplits.put(tbl, snapshotSplits);
-            }
-            // save chunk list to system table
-            saveChunkMeta(tableSplits);
-            this.remainingSplits = tableSplits.values().stream()
-                    .flatMap(List::stream)
-                    .collect(Collectors.toList());
-        } else {
-            // The source reader is automatically initialized when the split 
is obtained.
-            // In latest mode, a separate init is required.init source reader
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /**
+     * One-time setup at CREATE.
+     * - initial/snapshot mode: init split progress; scheduler will drive 
advanceSplits() each tick.
+     * - latest mode (and other non-splitting modes): open the remote reader 
(e.g. PG slot) so the
+     *   binlog phase can start immediately; no snapshot splitting will happen.
+     */
+    @Override
+    public void initOnCreate(List<String> syncTables) throws JobException {
+        if (!checkNeedSplitChunks(sourceProperties)) {
             initSourceReader();
+            return;
+        }
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            return true;
+        }
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
         }
     }
 
-    private void saveChunkMeta(Map<String, List<SnapshotSplit>> tableSplits) 
throws JobException {
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        // SnapshotSplit.tableId is qualified ("schema.table"/"db.table"); 
cachedSyncTables is bare — normalize.
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(getTableName(s.getTableId()));
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(getTableName(s.getTableId()));
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            
touched.add(getTableName(cdcSplitProgress.getCurrentSplittingTable()));
+        }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(getTableName(t))) {
+                result.add(t);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void advanceSplits() throws JobException {
+        // Phase 1 (locked, fast): pick next table & snapshot the resume 
cursor.
+        String tbl;
+        Object[] startVal;
+        Integer splitId;
+        synchronized (splitsLock) {
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            tbl = cdcSplitProgress.getCurrentSplittingTable();
+            startVal = cdcSplitProgress.getNextSplitStart();
+            splitId = cdcSplitProgress.getNextSplitId();
+        }
+
+        // Phase 2 (unlocked, slow): RPC. Keeps updateOffset / scheduler tick 
unblocked
+        // so task dispatch can drain remainingSplits while we fetch the next 
batch.
+        List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+        if (batch == null || batch.isEmpty()) {

Review Comment:
   When `rpcFetchSplitsBatch` returns an empty batch, `advanceSplits` returns 
early after Phase 1 has already set `cdcSplitProgress.currentSplittingTable` 
(with nextSplitStart/nextSplitId both null). On the next tick, Phase 1 sees 
`currentSplittingTable != null` and re-issues the same RPC with the same 
(table, null, null) parameters. If the cdc_client legitimately returns an empty 
batch for that table (e.g. a transient cdc_client issue, a view, or an 
unexpectedly empty result), the job will loop forever on the same table and 
never advance to the next table or transition to the binlog phase — there is no 
retry budget, no fallback to mark the table as done, and no error surfaced. 
Consider either (a) treating empty as table-done (clear progress and pick 
next), or (b) surfacing an error after N consecutive empty batches on the same 
(table, splitId) pair.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to