github-actions[bot] commented on code in PR #64301:
URL: https://github.com/apache/doris/pull/64301#discussion_r3393132446
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1404,20 +1405,49 @@ public void gsonPostProcess() throws IOException {
}
}
+ /**
+ * Actively pull the running task's fail reason from cdc_client and fail
it immediately,
+ * so a hard write failure (e.g. data quality error) does not wait out the
whole timeout.
+ * Only applies to StreamingMultiTask.
+ */
+ public void detectTaskFailure() throws JobException {
+ AbstractStreamingTask task = this.runningStreamTask;
Review Comment:
Separate instance of the stale-snapshot race already raised on
`processTimeoutTasks`: this new failure-detection path also reads
`runningStreamTask` without the job read lock and then performs
`/api/getFailReason` before the identity check. That RPC is not just a read;
`PipelineCoordinator.getTaskFailReason()` removes the stored error. Since
updates to `runningStreamTask` happen under the write lock and the field is not
volatile, the scheduler has no happens-before edge here, so it can poll and
consume fail state for an obsolete task, then drop it when the identity/status
check fails, or skip the current task until a later tick. Please take the same
short read-locked snapshot for this path, and avoid clearing the cdc_client
error until FE has confirmed the task is still current and running.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1404,20 +1405,49 @@ public void gsonPostProcess() throws IOException {
}
}
+ /**
+ * Actively pull the running task's fail reason from cdc_client and fail
it immediately,
+ * so a hard write failure (e.g. data quality error) does not wait out the
whole timeout.
+ * Only applies to StreamingMultiTask.
+ */
+ public void detectTaskFailure() throws JobException {
+ AbstractStreamingTask task = this.runningStreamTask;
+ if (!(task instanceof StreamingMultiTblTask)) {
+ return;
+ }
+ StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+ String failReason = runningMultiTask.getFailReason();
Review Comment:
`getFailReason()` does not handle a missing backend: after `runningBackendId
> 0` it does `Env.getCurrentSystemInfo().getBackend(runningBackendId)` and
later dereferences `backend.getHost()`/`getBrpcPort()` without a null check,
while only timeout/execution/interruption are caught. This new call runs on
every RUNNING scheduler tick, so if the selected BE is removed or restarted
while the CDC task is still recorded as running, `detectTaskFailure()` throws
an NPE and the tick fails before `processTimeoutTasks()` can reschedule the
task. Please mirror the null handling in `fetchProgress()` or otherwise turn
this into an empty fail reason with a useful log.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]