github-actions[bot] commented on code in PR #64013:
URL: https://github.com/apache/doris/pull/64013#discussion_r3340186054
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -138,6 +138,22 @@ public Object close(@RequestBody JobBaseConfig jobConfig) {
return RestResponse.success(true);
}
+ /** Release a job's reader on this backend: stop engine, keep the
replication slot. */
+ @RequestMapping(path = "/api/releaseReader", method = RequestMethod.POST)
+ public Object releaseReader(@RequestBody JobBaseConfig jobConfig) {
+ LOG.info("Releasing reader (keep slot) for job {}",
jobConfig.getJobId());
+ Env env = Env.getCurrentEnv();
+ SourceReader reader = env.getReaderIfPresent(jobConfig.getJobId());
Review Comment:
This release is keyed only by `jobId`, so a stale release RPC can kill the
next task's reader. The FE sends `/api/releaseReader` fire-and-forget from
timeout/cancel, and `StreamingJobUtils.selectBackend()` may choose the same
backend for the restarted job before the old RPC is delivered. At that point
`Env.getReaderIfPresent(jobId)` returns the new task's reader because `Env` is
keyed by job id, and this handler calls `reader.release(...)`,
`env.close(jobId)`, and `closeJobStreamLoad(jobId)`, interrupting the active
replacement task. Please include and validate a task/generation identity (for
example the task id currently assigned to the writer) before releasing, and
make stale releases no-ops.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]