Copilot commented on code in PR #61389:
URL: https://github.com/apache/doris/pull/61389#discussion_r2939719914


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -116,7 +116,11 @@ public Offset getNextOffset(StreamingJobProperties 
jobProps, Map<String, String>
             nextOffset.setSplits(snapshotSplits);
             return nextOffset;
         } else if (currentOffset != null && currentOffset.snapshotSplit()) {
-            // snapshot to binlog
+            if (isSnapshotOnlyMode()) {
+                // snapshot-only mode: all splits done, signal job to stop
+                return null;

Review Comment:
   `getNextOffset()` returns `null` to signal completion in snapshot-only mode. 
The `SourceOffsetProvider` contract doesn't document that `null` is allowed, 
and other task implementations (e.g. `StreamingInsertTask`) call 
`runningOffset.toString()` without a null check. It’s safer to return a 
non-null Offset (or a dedicated “end” offset) and rely on `hasReachedEnd()` to 
stop scheduling / mark the job FINISHED.
   



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java:
##########
@@ -83,7 +83,8 @@ private static boolean isValidValue(String key, String value) 
{
 
         if (key.equals(DataSourceConfigKeys.OFFSET)
                 && !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
-                || value.equals(DataSourceConfigKeys.OFFSET_LATEST))) {
+                || value.equals(DataSourceConfigKeys.OFFSET_LATEST)
+                || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
             return false;

Review Comment:
   Offset value validation is case-sensitive (`value.equals(...)`), but the 
rest of the code treats offset modes case-insensitively (e.g. 
`equalsIgnoreCase` in source readers). This will reject inputs like 
`OFFSET='SNAPSHOT'` even though they are otherwise supported. Consider 
normalizing `value` to lower-case or switching these comparisons to 
`equalsIgnoreCase`.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -119,6 +124,10 @@ public void run() throws JobException {
             log.info("streaming task has been canceled, task id is {}", 
getTaskId());
             return;
         }
+        if (this.runningOffset == null) {
+            // offset is null when source has reached end (e.g. snapshot-only 
mode completed)
+            return;
+        }

Review Comment:
   If `runningOffset` is null, the task exits `before()`/`run()` early but 
never transitions to SUCCESS (that normally happens via `successCallback()` 
after BE calls `commitOffset`). This can leave the task stuck in RUNNING and 
never removed from `StreamingTaskManager`, and the job may never reach the 
`onStreamTaskSuccess()` path that marks FINISHED. If you keep the null-offset 
approach, make sure the task marks itself SUCCESS and notifies the job (or 
avoid scheduling tasks when `offsetProvider.hasReachedEnd()` is true).



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -620,6 +620,12 @@ public void onStreamTaskSuccess(AbstractStreamingTask 
task) throws JobException
             }
 
             
Env.getCurrentEnv().getJobManager().getStreamingTaskManager().removeRunningTask(task);
+            if (offsetProvider.hasReachedEnd()) {
+                // offset provider has reached a natural end, mark job as 
finished
+                log.info("Streaming insert job {} source data fully consumed, 
marking job as FINISHED", getJobId());
+                updateJobStatus(JobStatus.FINISHED);
+                return;
+            }

Review Comment:
   `hasReachedEnd()` is only checked inside `onStreamTaskSuccess()`, which 
requires a task to successfully commit an offset. In snapshot-only recovery 
scenarios where FE restarts after snapshot completion, `hasMoreDataToConsume()` 
can become false and the scheduler will only delay/reschedule tasks, never 
reaching this callback to mark the job FINISHED. Consider adding a FINISHED 
transition when `offsetProvider.hasReachedEnd()` becomes true even without 
another successful task commit (e.g. in the scheduler’s pre-checks or job state 
restoration).



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -376,10 +383,13 @@ public void replayIfNeed(StreamingInsertJob job) throws 
JobException {
                         if (!lastSnapshotSplits.isEmpty()) {
                             currentOffset.setSplits(lastSnapshotSplits);
                         } else {
-                            // when snapshot to binlog phase fe restarts
-                            BinlogSplit binlogSplit = new BinlogSplit();
-                            binlogSplit.setFinishedSplits(finishedSplits);
-                            
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+                            if (!isSnapshotOnlyMode()) {
+                                // initial mode: rebuild binlog split for 
snapshot-to-binlog transition
+                                BinlogSplit binlogSplit = new BinlogSplit();
+                                binlogSplit.setFinishedSplits(finishedSplits);
+                                
currentOffset.setSplits(Collections.singletonList(binlogSplit));
+                            }
+                            // snapshot-only: leave splits empty, 
hasReachedEnd() will return true

Review Comment:
   In snapshot-only mode this branch leaves `currentOffset` with an empty 
`splits` list, but later logic calls `currentOffset.snapshotSplit()` (e.g. 
`getNextOffset()`, `hasMoreDataToConsume()`, `hasReachedEnd()`), and 
`JdbcOffset.snapshotSplit()` throws when `splits` is null/empty. This can break 
crash recovery right after the snapshot completes. Consider ensuring 
`currentOffset` is either null, or always contains a non-empty snapshot marker 
split, and/or make `hasReachedEnd()`/`getNextOffset()` robust to an empty 
`splits` list.



##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java:
##########
@@ -27,11 +27,12 @@ public class DataSourceConfigKeys {
     public static final String SCHEMA = "schema";
     public static final String INCLUDE_TABLES = "include_tables";
     public static final String EXCLUDE_TABLES = "exclude_tables";
-    // initial,earliest,latest,{binlog,postion},\d{13}
+    // initial,earliest,latest,snapshot,{binlog,postion},\d{13}

Review Comment:
   Typo in the offset mode comment: `postion` should be `position` (also 
applies to the `{binlog,position}` placeholder).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to