Copilot commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3238878651
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +631,238 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
- public void splitChunks(List<String> createTbls) throws JobException {
- // todo: When splitting takes a long time, it needs to be changed to
asynchronous.
- if (checkNeedSplitChunks(sourceProperties)) {
- Map<String, List<SnapshotSplit>> tableSplits = new
LinkedHashMap<>();
- for (String tbl : createTbls) {
- List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
- tableSplits.put(tbl, snapshotSplits);
- }
- // save chunk list to system table
- saveChunkMeta(tableSplits);
- this.remainingSplits = tableSplits.values().stream()
- .flatMap(List::stream)
- .collect(Collectors.toList());
- } else {
- // The source reader is automatically initialized when the split
is obtained.
- // In latest mode, a separate init is required.init source reader
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /**
+ * One-time setup at CREATE.
+ * - initial/snapshot mode: init split progress; scheduler will drive
advanceSplits() each tick.
+ * - latest mode (and other non-splitting modes): open the remote reader
(e.g. PG slot) so the
+ * binlog phase can start immediately; no snapshot splitting will happen.
+ */
+ @Override
+ public void initOnCreate(List<String> syncTables) throws JobException {
+ if (!checkNeedSplitChunks(sourceProperties)) {
initSourceReader();
+ return;
+ }
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ if (!checkNeedSplitChunks(sourceProperties)) {
+ return true;
+ }
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
}
}
- private void saveChunkMeta(Map<String, List<SnapshotSplit>> tableSplits)
throws JobException {
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ // SnapshotSplit.tableId is qualified ("schema.table"/"db.table");
cachedSyncTables is bare — normalize.
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(getTableName(s.getTableId()));
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(getTableName(s.getTableId()));
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+
touched.add(getTableName(cdcSplitProgress.getCurrentSplittingTable()));
+ }
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(getTableName(t))) {
+ result.add(t);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void advanceSplits() throws JobException {
+ // Phase 1 (locked, fast): pick next table & snapshot the resume
cursor.
+ String tbl;
+ Object[] startVal;
+ Integer splitId;
+ synchronized (splitsLock) {
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ tbl = cdcSplitProgress.getCurrentSplittingTable();
+ startVal = cdcSplitProgress.getNextSplitStart();
+ splitId = cdcSplitProgress.getNextSplitId();
+ }
+
+ // Phase 2 (unlocked, slow): RPC. Keeps updateOffset / scheduler tick
unblocked
+ // so task dispatch can drain remainingSplits while we fetch the next
batch.
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
Review Comment:
When `rpcFetchSplitsBatch` returns an empty batch, `advanceSplits` returns
early after Phase 1 has already set `cdcSplitProgress.currentSplittingTable`
(with nextSplitStart/nextSplitId both null). On the next tick, Phase 1 sees
`currentSplittingTable != null` and re-issues the same RPC with the same
(table, null, null) parameters. If the cdc_client legitimately returns an empty
batch for that table (e.g. a transient cdc_client issue, a view, or an
unexpectedly empty result), the job will loop forever on the same table and
never advance to the next table or transition to the binlog phase — there is no
retry budget, no fallback to mark the table as done, and no error surfaced.
Consider either (a) treating empty as table-done (clear progress and pick
next), or (b) surfacing an error after N consecutive empty batches on the same
(table, splitId) pair.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]