github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3232538593


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +631,216 @@ public String getPersistInfo() {
         return GsonUtils.GSON.toJson(this);
     }
 
-    public void splitChunks(List<String> createTbls) throws JobException {
-        // todo: When splitting takes a long time, it needs to be changed to 
asynchronous.
-        if (checkNeedSplitChunks(sourceProperties)) {
-            Map<String, List<SnapshotSplit>> tableSplits = new 
LinkedHashMap<>();
-            for (String tbl : createTbls) {
-                List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
-                tableSplits.put(tbl, snapshotSplits);
-            }
-            // save chunk list to system table
-            saveChunkMeta(tableSplits);
-            this.remainingSplits = tableSplits.values().stream()
-                    .flatMap(List::stream)
-                    .collect(Collectors.toList());
-        } else {
-            // The source reader is automatically initialized when the split 
is obtained.
-            // In latest mode, a separate init is required.init source reader
+    // ============ Async split progress (driven by scheduler each tick) 
============
+
+    /**
+     * One-time setup at CREATE.
+     * - initial/snapshot mode: init split progress; scheduler will drive 
advanceSplits() each tick.
+     * - latest mode (and other non-splitting modes): open the remote reader 
(e.g. PG slot) so the
+     *   binlog phase can start immediately; no snapshot splitting will happen.
+     */
+    @Override
+    public void initOnCreate(List<String> syncTables) throws JobException {
+        if (!checkNeedSplitChunks(sourceProperties)) {
             initSourceReader();
+            return;
+        }
+        synchronized (splitsLock) {
+            this.cachedSyncTables = syncTables;
+            this.committedSplitProgress = new SplitProgress();
+            this.cdcSplitProgress = new SplitProgress();
+        }
+    }
+
+    @Override
+    public boolean noMoreSplits() {
+        if (!checkNeedSplitChunks(sourceProperties)) {
+            return true;
+        }
+        synchronized (splitsLock) {
+            return cdcSplitProgress.getCurrentSplittingTable() == null
+                    && computeCdcRemainingTables().isEmpty();
+        }
+    }
+
+    /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+    private List<String> computeCdcRemainingTables() {
+        if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        // SnapshotSplit.tableId is qualified ("schema.table"/"db.table"); 
cachedSyncTables is bare — normalize.
+        Set<String> touched = new HashSet<>();
+        for (SnapshotSplit s : finishedSplits) {
+            touched.add(getTableName(s.getTableId()));
+        }
+        for (SnapshotSplit s : remainingSplits) {
+            touched.add(getTableName(s.getTableId()));
+        }
+        if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+            
touched.add(getTableName(cdcSplitProgress.getCurrentSplittingTable()));
         }
+        List<String> result = new ArrayList<>(cachedSyncTables.size());
+        for (String t : cachedSyncTables) {
+            if (!touched.contains(getTableName(t))) {
+                result.add(t);
+            }
+        }
+        return result;
     }
 
-    private void saveChunkMeta(Map<String, List<SnapshotSplit>> tableSplits) 
throws JobException {
+    @Override
+    public void advanceSplits() throws JobException {
+        // Phase 1 (locked, fast): pick next table & snapshot the resume 
cursor.
+        String tbl;
+        Object[] startVal;
+        Integer splitId;
+        synchronized (splitsLock) {
+            if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+                List<String> remaining = computeCdcRemainingTables();
+                if (remaining.isEmpty()) {
+                    return;
+                }
+                cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+                cdcSplitProgress.setNextSplitStart(null);
+                cdcSplitProgress.setNextSplitId(null);
+            }
+            tbl = cdcSplitProgress.getCurrentSplittingTable();
+            startVal = cdcSplitProgress.getNextSplitStart();
+            splitId = cdcSplitProgress.getNextSplitId();
+        }
+
+        // Phase 2 (unlocked, slow): RPC. Keeps updateOffset / scheduler tick 
unblocked
+        // so task dispatch can drain remainingSplits while we fetch the next 
batch.
+        List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, 
splitId);
+        if (batch == null || batch.isEmpty()) {
+            return;
+        }
+
+        // Phase 3 (locked, fast): reconcile + merge + advance 
cdcSplitProgress.
+        List<SnapshotSplit> splitsOfTbl;
+        int newSplitCount;
+        synchronized (splitsLock) {
+            // Defensive reconciliation: replayIfNeed / pause-resume may have 
moved the cursor
+            // while we were waiting on the RPC; drop this batch and let the 
next loop iteration
+            // re-pick from the new state.
+            if (!tbl.equals(cdcSplitProgress.getCurrentSplittingTable())
+                    || !Objects.equals(splitId, 
cdcSplitProgress.getNextSplitId())) {
+                log.info("advanceSplits discard batch for job {} table {}: 
state moved on "
+                                + "during RPC (now table={}, splitId={})",
+                        getJobId(), tbl,
+                        cdcSplitProgress.getCurrentSplittingTable(),
+                        cdcSplitProgress.getNextSplitId());
+                return;
+            }
+            Set<String> existingIds = new HashSet<>();
+            finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+            List<SnapshotSplit> newSplits = new ArrayList<>();
+            for (SnapshotSplit s : batch) {
+                if (!existingIds.contains(s.getSplitId())) {
+                    newSplits.add(s);
+                }
+            }
+            if (newSplits.size() < batch.size()) {
+                log.info("advanceSplits dedup'd {} duplicate splits (batch={}, 
new={}) for job {} table {}",
+                        batch.size() - newSplits.size(), batch.size(), 
newSplits.size(), getJobId(), tbl);
+            }
+            remainingSplits.addAll(newSplits);
+            newSplitCount = newSplits.size();
+            splitsOfTbl = Stream.concat(finishedSplits.stream(), 
remainingSplits.stream())
+                    .filter(s -> tbl.equals(s.getTableId()))
+                    .sorted(Comparator.comparingInt(s -> 
splitIdOf(s.getSplitId())))
+                    .collect(Collectors.toList());
+            applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 
1));
+            log.info("advanceSplits jobId={} table={} request(nextStart={}, 
nextSplitId={}) "
+                            + "got {} new splits, cdcSplitProgress -> 
(table={}, nextStart={}, nextSplitId={})",
+                    getJobId(), tbl, Arrays.toString(startVal), splitId, 
newSplitCount,
+                    cdcSplitProgress.getCurrentSplittingTable(),
+                    Arrays.toString(cdcSplitProgress.getNextSplitStart()),
+                    cdcSplitProgress.getNextSplitId());
+        }
+
+        // Phase 4 (unlocked, slow): persist chunk_list. Failure here is 
recoverable — on
+        // restart, replayIfNeed reloads from system table at 
committedSplitProgress and
+        // cdc_client will re-cut these chunks; mergeBySplitId dedups on the 
next iteration.
         try {
-            StreamingJobUtils.createMetaTableIfNotExist();
-            StreamingJobUtils.insertSplitsToMeta(getJobId(), tableSplits);
+            StreamingJobUtils.upsertChunkList(getJobId(), tbl, splitsOfTbl);
         } catch (Exception e) {
-            log.warn("save chunk meta error: ", e);

Review Comment:
   Ignoring this failure makes the in-memory split cursor and the durable 
`streaming_job_meta` diverge. After phase 3, `remainingSplits` and 
`cdcSplitProgress` already include this batch, so the scheduler can dispatch 
and commit those splits; if FE restarts after that, `replayIfNeed()` relies on 
`restoreSplitsToJob()` to rebuild split boundaries and finished-split metadata 
for snapshot-to-binlog transition. When the UPSERT was skipped, the committed 
high-watermarks may exist in `offsetProviderPersist` but the corresponding 
split definitions are absent from the meta table, so recovery can re-cut/skip 
chunks or build a binlog split without the finished snapshot split list. Please 
treat the meta-table write as part of advancing the durable split state: either 
fail/pause and avoid exposing the batch until the UPSERT succeeds, or persist 
before merging it into schedulable `remainingSplits`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to