Copilot commented on code in PR #61389:
URL: https://github.com/apache/doris/pull/61389#discussion_r2939719914
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -116,7 +116,11 @@ public Offset getNextOffset(StreamingJobProperties
jobProps, Map<String, String>
nextOffset.setSplits(snapshotSplits);
return nextOffset;
} else if (currentOffset != null && currentOffset.snapshotSplit()) {
- // snapshot to binlog
+ if (isSnapshotOnlyMode()) {
+ // snapshot-only mode: all splits done, signal job to stop
+ return null;
Review Comment:
`getNextOffset()` returns `null` to signal completion in snapshot-only mode.
The `SourceOffsetProvider` contract doesn't document that `null` is allowed,
and other task implementations (e.g. `StreamingInsertTask`) call
`runningOffset.toString()` without a null check. It’s safer to return a
non-null Offset (or a dedicated “end” offset) and rely on `hasReachedEnd()` to
stop scheduling / mark the job FINISHED.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java:
##########
@@ -83,7 +83,8 @@ private static boolean isValidValue(String key, String value)
{
if (key.equals(DataSourceConfigKeys.OFFSET)
&& !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
- || value.equals(DataSourceConfigKeys.OFFSET_LATEST))) {
+ || value.equals(DataSourceConfigKeys.OFFSET_LATEST)
+ || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
return false;
Review Comment:
Offset value validation is case-sensitive (`value.equals(...)`), but the
rest of the code treats offset modes case-insensitively (e.g.
`equalsIgnoreCase` in source readers). This will reject inputs like
`OFFSET='SNAPSHOT'` even though they are otherwise supported. Consider
normalizing `value` to lower-case or switching these comparisons to
`equalsIgnoreCase`.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -119,6 +124,10 @@ public void run() throws JobException {
log.info("streaming task has been canceled, task id is {}",
getTaskId());
return;
}
+ if (this.runningOffset == null) {
+ // offset is null when source has reached end (e.g. snapshot-only
mode completed)
+ return;
+ }
Review Comment:
If `runningOffset` is null, the task exits `before()`/`run()` early but
never transitions to SUCCESS (that normally happens via `successCallback()`
after BE calls `commitOffset`). This can leave the task stuck in RUNNING and
never removed from `StreamingTaskManager`, and the job may never reach the
`onStreamTaskSuccess()` path that marks FINISHED. If you keep the null-offset
approach, make sure the task marks itself SUCCESS and notifies the job (or
avoid scheduling tasks when `offsetProvider.hasReachedEnd()` is true).
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -620,6 +620,12 @@ public void onStreamTaskSuccess(AbstractStreamingTask
task) throws JobException
}
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+ if (offsetProvider.hasReachedEnd()) {
+ // offset provider has reached a natural end, mark job as
finished
+ log.info("Streaming insert job {} source data fully consumed,
marking job as FINISHED", getJobId());
+ updateJobStatus(JobStatus.FINISHED);
+ return;
+ }
Review Comment:
`hasReachedEnd()` is only checked inside `onStreamTaskSuccess()`, which
requires a task to successfully commit an offset. In snapshot-only recovery
scenarios where FE restarts after snapshot completion, `hasMoreDataToConsume()`
can become false and the scheduler will only delay/reschedule tasks, never
reaching this callback to mark the job FINISHED. Consider adding a FINISHED
transition when `offsetProvider.hasReachedEnd()` becomes true even without
another successful task commit (e.g. in the scheduler’s pre-checks or job state
restoration).
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -376,10 +383,13 @@ public void replayIfNeed(StreamingInsertJob job) throws
JobException {
if (!lastSnapshotSplits.isEmpty()) {
currentOffset.setSplits(lastSnapshotSplits);
} else {
- // when snapshot to binlog phase fe restarts
- BinlogSplit binlogSplit = new BinlogSplit();
- binlogSplit.setFinishedSplits(finishedSplits);
-
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+ if (!isSnapshotOnlyMode()) {
+ // initial mode: rebuild binlog split for
snapshot-to-binlog transition
+ BinlogSplit binlogSplit = new BinlogSplit();
+ binlogSplit.setFinishedSplits(finishedSplits);
+
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+ }
+ // snapshot-only: leave splits empty,
hasReachedEnd() will return true
Review Comment:
In snapshot-only mode this branch leaves `currentOffset` with an empty
`splits` list, but later logic calls `currentOffset.snapshotSplit()` (e.g.
`getNextOffset()`, `hasMoreDataToConsume()`, `hasReachedEnd()`), and
`JdbcOffset.snapshotSplit()` throws when `splits` is null/empty. This can break
crash recovery right after the snapshot completes. Consider ensuring
`currentOffset` is either null, or always contains a non-empty snapshot marker
split, and/or make `hasReachedEnd()`/`getNextOffset()` robust to an empty
`splits` list.
##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java:
##########
@@ -27,11 +27,12 @@ public class DataSourceConfigKeys {
public static final String SCHEMA = "schema";
public static final String INCLUDE_TABLES = "include_tables";
public static final String EXCLUDE_TABLES = "exclude_tables";
- // initial,earliest,latest,{binlog,postion},\d{13}
+ // initial,earliest,latest,snapshot,{binlog,postion},\d{13}
Review Comment:
Typo in the offset mode comment: `postion` should be `position` (also
applies to the `{binlog,position}` placeholder).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]