github-actions[bot] commented on code in PR #64013:
URL: https://github.com/apache/doris/pull/64013#discussion_r3340489083
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -443,6 +443,9 @@ public void writeRecords(WriteRecordRequest
writeRecordRequest) throws Exception
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
+ // Claim reader ownership before the engine starts, so a stale
releaseReader RPC is a no-op.
+ Env.getCurrentEnv()
+ .setReaderOwner(writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId());
DorisBatchStreamLoad batchStreamLoad = null;
Review Comment:
The ownership guard is still racy because the replacement task calls
`getReader()` before it claims the new `ownerTaskId`. A delayed release for
task A can arrive after task B has reused the existing job context/reader at
line 445 but before this `setReaderOwner(B)` executes; `getReaderIfOwner(jobId,
A)` still sees owner A, releases the reader, removes the context, and closes
the job stream-load, so task B continues with a reader/loader that was just
stopped. This is a distinct instance from the earlier unguarded-release issue:
the new guard helps after ownership is updated, but the acquire-and-claim
transition itself needs to be atomic, e.g. create/reuse the reader and set
owner under the same per-job lock before returning it to `writeRecords()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]