github-actions[bot] commented on code in PR #64301:
URL: https://github.com/apache/doris/pull/64301#discussion_r3387854282
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1409,14 +1410,17 @@ public void gsonPostProcess() throws IOException {
* Only applies to StreamingMultiTask.
*/
public void processTimeoutTasks() throws JobException {
- if (!(runningStreamTask instanceof StreamingMultiTblTask)) {
+ AbstractStreamingTask task = this.runningStreamTask;
+ if (!(task instanceof StreamingMultiTblTask)) {
return;
}
+ StreamingMultiTblTask runningMultiTask = (StreamingMultiTblTask) task;
+ StreamingTaskProgress progress = runningMultiTask.fetchProgress();
writeLock();
Review Comment:
`runningStreamTask` is mutated under the job lock in paths like
`createStreamingTask`, `clearRunningStreamTask`, and `onStreamTaskSuccess`, but
this new progress path snapshots it without even a read lock and then uses that
snapshot to issue `/api/getProgress` before acquiring the write lock. Since the
field is not `volatile`, the scheduler has no happens-before edge with those
updates: after a task success/auto-resume/pause it can keep polling an obsolete
task/backend, or skip checking the new running task's timeout, and the later
identity check only prevents failing the wrong task after the RPC has already
happened. Please take a short read lock to publish a current task snapshot,
release it, then do `fetchProgress()` and keep the existing identity check
under the write lock.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]