github-actions[bot] commented on code in PR #64301:
URL: https://github.com/apache/doris/pull/64301#discussion_r3393132446


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1404,20 +1405,49 @@ public void gsonPostProcess() throws IOException {
         }
     }
 
+    /**
+     * Actively pull the running task's fail reason from cdc_client and fail 
it immediately,
+     * so a hard write failure (e.g. data quality error) does not wait out the 
whole timeout.
+     * Only applies to StreamingMultiTask.
+     */
+    public void detectTaskFailure() throws JobException {
+        AbstractStreamingTask task = this.runningStreamTask;

Review Comment:
   Separate instance of the stale-snapshot race already raised on 
`processTimeoutTasks`: this new failure-detection path also reads 
`runningStreamTask` without the job read lock and then performs 
`/api/getFailReason` before the identity check. That RPC is not just a read; 
`PipelineCoordinator.getTaskFailReason()` removes the stored error. Since 
updates to `runningStreamTask` happen under the write lock and the field is not 
volatile, the scheduler has no happens-before edge here, so it can poll and 
consume fail state for an obsolete task, then drop it when the identity/status 
check fails, or skip the current task until a later tick. Please take the same 
short read-locked snapshot for this path, and avoid clearing the cdc_client 
error until FE has confirmed the task is still current and running.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1404,20 +1405,49 @@ public void gsonPostProcess() throws IOException {
         }
     }
 
+    /**
+     * Actively pull the running task's fail reason from cdc_client and fail 
it immediately,
+     * so a hard write failure (e.g. data quality error) does not wait out the 
whole timeout.
+     * Only applies to StreamingMultiTask.
+     */
+    public void detectTaskFailure() throws JobException {
+        AbstractStreamingTask task = this.runningStreamTask;
+        if (!(task instanceof StreamingMultiTblTask)) {
+            return;
+        }
+        StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+        String failReason = runningMultiTask.getFailReason();

Review Comment:
   `getFailReason()` does not handle a missing backend: after `runningBackendId 
> 0` it does `Env.getCurrentSystemInfo().getBackend(runningBackendId)` and 
later dereferences `backend.getHost()`/`getBrpcPort()` without a null check, 
while only timeout/execution/interruption are caught. This new call runs on 
every RUNNING scheduler tick, so if the selected BE is removed or restarted 
while the CDC task is still recorded as running, `detectTaskFailure()` throws 
an NPE and the tick fails before `processTimeoutTasks()` can reschedule the 
task. Please mirror the null handling in `fetchProgress()` or otherwise turn 
this into an empty fail reason with a useful log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to