github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3232538593
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -533,78 +631,216 @@ public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
- public void splitChunks(List<String> createTbls) throws JobException {
- // todo: When splitting takes a long time, it needs to be changed to
asynchronous.
- if (checkNeedSplitChunks(sourceProperties)) {
- Map<String, List<SnapshotSplit>> tableSplits = new
LinkedHashMap<>();
- for (String tbl : createTbls) {
- List<SnapshotSplit> snapshotSplits = requestTableSplits(tbl);
- tableSplits.put(tbl, snapshotSplits);
- }
- // save chunk list to system table
- saveChunkMeta(tableSplits);
- this.remainingSplits = tableSplits.values().stream()
- .flatMap(List::stream)
- .collect(Collectors.toList());
- } else {
- // The source reader is automatically initialized when the split
is obtained.
- // In latest mode, a separate init is required.init source reader
+ // ============ Async split progress (driven by scheduler each tick)
============
+
+ /**
+ * One-time setup at CREATE.
+ * - initial/snapshot mode: init split progress; scheduler will drive
advanceSplits() each tick.
+ * - latest mode (and other non-splitting modes): open the remote reader
(e.g. PG slot) so the
+ * binlog phase can start immediately; no snapshot splitting will happen.
+ */
+ @Override
+ public void initOnCreate(List<String> syncTables) throws JobException {
+ if (!checkNeedSplitChunks(sourceProperties)) {
initSourceReader();
+ return;
+ }
+ synchronized (splitsLock) {
+ this.cachedSyncTables = syncTables;
+ this.committedSplitProgress = new SplitProgress();
+ this.cdcSplitProgress = new SplitProgress();
+ }
+ }
+
+ @Override
+ public boolean noMoreSplits() {
+ if (!checkNeedSplitChunks(sourceProperties)) {
+ return true;
+ }
+ synchronized (splitsLock) {
+ return cdcSplitProgress.getCurrentSplittingTable() == null
+ && computeCdcRemainingTables().isEmpty();
+ }
+ }
+
+ /** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
+ private List<String> computeCdcRemainingTables() {
+ if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ // SnapshotSplit.tableId is qualified ("schema.table"/"db.table");
cachedSyncTables is bare — normalize.
+ Set<String> touched = new HashSet<>();
+ for (SnapshotSplit s : finishedSplits) {
+ touched.add(getTableName(s.getTableId()));
+ }
+ for (SnapshotSplit s : remainingSplits) {
+ touched.add(getTableName(s.getTableId()));
+ }
+ if (cdcSplitProgress.getCurrentSplittingTable() != null) {
+
touched.add(getTableName(cdcSplitProgress.getCurrentSplittingTable()));
}
+ List<String> result = new ArrayList<>(cachedSyncTables.size());
+ for (String t : cachedSyncTables) {
+ if (!touched.contains(getTableName(t))) {
+ result.add(t);
+ }
+ }
+ return result;
}
- private void saveChunkMeta(Map<String, List<SnapshotSplit>> tableSplits)
throws JobException {
+ @Override
+ public void advanceSplits() throws JobException {
+ // Phase 1 (locked, fast): pick next table & snapshot the resume
cursor.
+ String tbl;
+ Object[] startVal;
+ Integer splitId;
+ synchronized (splitsLock) {
+ if (cdcSplitProgress.getCurrentSplittingTable() == null) {
+ List<String> remaining = computeCdcRemainingTables();
+ if (remaining.isEmpty()) {
+ return;
+ }
+ cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
+ cdcSplitProgress.setNextSplitStart(null);
+ cdcSplitProgress.setNextSplitId(null);
+ }
+ tbl = cdcSplitProgress.getCurrentSplittingTable();
+ startVal = cdcSplitProgress.getNextSplitStart();
+ splitId = cdcSplitProgress.getNextSplitId();
+ }
+
+ // Phase 2 (unlocked, slow): RPC. Keeps updateOffset / scheduler tick
unblocked
+ // so task dispatch can drain remainingSplits while we fetch the next
batch.
+ List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal,
splitId);
+ if (batch == null || batch.isEmpty()) {
+ return;
+ }
+
+ // Phase 3 (locked, fast): reconcile + merge + advance
cdcSplitProgress.
+ List<SnapshotSplit> splitsOfTbl;
+ int newSplitCount;
+ synchronized (splitsLock) {
+ // Defensive reconciliation: replayIfNeed / pause-resume may have
moved the cursor
+ // while we were waiting on the RPC; drop this batch and let the
next loop iteration
+ // re-pick from the new state.
+ if (!tbl.equals(cdcSplitProgress.getCurrentSplittingTable())
+ || !Objects.equals(splitId,
cdcSplitProgress.getNextSplitId())) {
+ log.info("advanceSplits discard batch for job {} table {}:
state moved on "
+ + "during RPC (now table={}, splitId={})",
+ getJobId(), tbl,
+ cdcSplitProgress.getCurrentSplittingTable(),
+ cdcSplitProgress.getNextSplitId());
+ return;
+ }
+ Set<String> existingIds = new HashSet<>();
+ finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
+ List<SnapshotSplit> newSplits = new ArrayList<>();
+ for (SnapshotSplit s : batch) {
+ if (!existingIds.contains(s.getSplitId())) {
+ newSplits.add(s);
+ }
+ }
+ if (newSplits.size() < batch.size()) {
+ log.info("advanceSplits dedup'd {} duplicate splits (batch={},
new={}) for job {} table {}",
+ batch.size() - newSplits.size(), batch.size(),
newSplits.size(), getJobId(), tbl);
+ }
+ remainingSplits.addAll(newSplits);
+ newSplitCount = newSplits.size();
+ splitsOfTbl = Stream.concat(finishedSplits.stream(),
remainingSplits.stream())
+ .filter(s -> tbl.equals(s.getTableId()))
+ .sorted(Comparator.comparingInt(s ->
splitIdOf(s.getSplitId())))
+ .collect(Collectors.toList());
+ applySplitToProgress(cdcSplitProgress, batch.get(batch.size() -
1));
+ log.info("advanceSplits jobId={} table={} request(nextStart={},
nextSplitId={}) "
+ + "got {} new splits, cdcSplitProgress ->
(table={}, nextStart={}, nextSplitId={})",
+ getJobId(), tbl, Arrays.toString(startVal), splitId,
newSplitCount,
+ cdcSplitProgress.getCurrentSplittingTable(),
+ Arrays.toString(cdcSplitProgress.getNextSplitStart()),
+ cdcSplitProgress.getNextSplitId());
+ }
+
+ // Phase 4 (unlocked, slow): persist chunk_list. Failure here is
recoverable — on
+ // restart, replayIfNeed reloads from system table at
committedSplitProgress and
+ // cdc_client will re-cut these chunks; mergeBySplitId dedups on the
next iteration.
try {
- StreamingJobUtils.createMetaTableIfNotExist();
- StreamingJobUtils.insertSplitsToMeta(getJobId(), tableSplits);
+ StreamingJobUtils.upsertChunkList(getJobId(), tbl, splitsOfTbl);
} catch (Exception e) {
- log.warn("save chunk meta error: ", e);
Review Comment:
Ignoring this failure makes the in-memory split cursor and the durable
`streaming_job_meta` diverge. After phase 3, `remainingSplits` and
`cdcSplitProgress` already include this batch, so the scheduler can dispatch
and commit those splits; if FE restarts after that, `replayIfNeed()` relies on
`restoreSplitsToJob()` to rebuild split boundaries and finished-split metadata
for snapshot-to-binlog transition. When the UPSERT was skipped, the committed
high-watermarks may exist in `offsetProviderPersist` but the corresponding
split definitions are absent from the meta table, so recovery can re-cut/skip
chunks or build a binlog split without the finished snapshot split list. Please
treat the meta-table write as part of advancing the durable split state: either
fail/pause and avoid exposing the batch until the UPSERT succeeds, or persist
before merging it into schedulable `remainingSplits`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]