github-actions[bot] commented on code in PR #64013:
URL: https://github.com/apache/doris/pull/64013#discussion_r3345673449


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -138,6 +138,27 @@ public Object close(@RequestBody JobBaseConfig jobConfig) {
         return RestResponse.success(true);
     }
 
+    /** Release a job's reader on this backend: stop engine, keep the 
replication slot. */
+    @RequestMapping(path = "/api/releaseReader/{taskId}", method = 
RequestMethod.POST)
+    public Object releaseReader(
+            @PathVariable("taskId") String taskId, @RequestBody JobBaseConfig 
jobConfig) {
+        LOG.info("Releasing reader (keep slot) for job {} task {}", 
jobConfig.getJobId(), taskId);
+        Env env = Env.getCurrentEnv();
+        // Only the owning task may release; detach removes the context under 
the per-job lock so a
+        // racing claim rebuilds a fresh reader, and a stale RPC is a no-op.
+        SourceReader reader = env.detachReaderIfOwner(jobConfig.getJobId(), 
taskId);
+        if (reader == null) {
+            LOG.info(
+                    "No owned reader for job {} task {}, skip release",
+                    jobConfig.getJobId(),
+                    taskId);
+            return RestResponse.success(true);
+        }
+        reader.release(jobConfig);
+        pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());

Review Comment:
   This stream-load cleanup is still keyed only by `jobId`, so a valid delayed 
release from the old owning task can close the replacement task's loader. 
`detachReaderIfOwner(jobId, oldTaskId)` removes the old reader context under 
the per-job lock, but `reader.release()` runs outside that lock; before this 
line executes, the replacement task can call `getReaderAndClaim()`, then 
`getOrCreateBatchStreamLoad()` stores a fresh `DorisBatchStreamLoad` in 
`batchStreamLoadMap` for the same job id. The old release then reaches 
`closeJobStreamLoad(jobId)` and removes/closes that new loader, causing the 
active replacement task to fail with `load thread already exit` or to lose 
pending stream-load state. This is distinct from the existing stale-release 
reader threads because the release is for the correct old owner; the unguarded 
resource is the job-level stream-load map. Please make stream-load cleanup 
task/generation-aware or close the old loader atomically before a replacement 
task can publish
  a new one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to