github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3397216999


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -591,7 +602,21 @@ public void writeRecords(WriteRecordRequest 
writeRecordRequest) throws Exception
                     writeRecordRequest.getTaskId());
 
         } finally {

Review Comment:
   This is only a point-in-time ownership read. A stale task can observe 
`stillOwner == true`, then the FE timeout/auto-resume path can dispatch a 
successor that calls `getReaderAndClaim()` before this method reaches 
`cleanupReaderResources()`, `forceFlush()`, `resetTaskId()`, or 
`commitOffset()`. Those later operations use the job-scoped 
`DorisBatchStreamLoad`; the successor can already have changed 
`currentTaskId`/load props and cleared stats, so the old task can flush or 
commit mixed state after it no longer owns the reader. This is distinct from 
the existing `!stillOwner` early-return thread: here the stale task first 
observes `true` and then loses ownership before the guarded operations execute. 
The same lease gap exists in the catch-side `closeJobStreamLoad()` guard above. 
Please make the owner-to-cleanup/flush/commit decision atomic with ownership 
transition, or revalidate under an owned lease immediately before touching the 
reader/loader and committing.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -805,6 +847,7 @@ public void onStreamTaskFail(AbstractStreamingTask task) 
throws JobException {
 
     public void onStreamTaskSuccess(AbstractStreamingTask task) throws 
JobException {
         try {
+            this.needRebuildReader = false;
             resetFailureInfo(null);

Review Comment:
   This copies the persisted binding into the job, but the existing 
`JdbcSourceOffsetProvider` keeps its transient `boundBackendId` unchanged. 
`gsonPostProcess()` covers full-image load because `createOffsetProvider()` 
sets the bound BE, but journal replay uses this method on an already-created 
provider. If this follower becomes master before another binlog dispatch calls 
`resolveBoundBackend()`, `handleRunningState().fetchMeta()` or the scheduler's 
`hasMoreDataToConsume()` can route provider RPCs 
(`fetchEndOffset`/`compareOffset`, and later cleanup) with `boundBackendId == 
0` or a stale value, probing or closing a random BE instead of the persisted 
bound BE. Please sync `offsetProvider.setBoundBackendId(boundBackendId)` 
whenever replay/copy updates this field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to