github-actions[bot] commented on code in PR #64013:
URL: https://github.com/apache/doris/pull/64013#discussion_r3340684688


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -306,28 +308,65 @@ public void successCallback(CommitOffsetRequest 
offsetRequest) throws JobExcepti
 
     @Override
     protected void onFail(String errMsg) throws JobException {
+        // Release this task's reader before reschedule so it stops competing 
for the shared slot.
+        releaseRemoteReader();
         super.onFail(errMsg);
     }
 
     @Override
     public void cancel(boolean needWaitCancelComplete) {
         // No manual cancellation is required; the task ID will be checked for 
consistency in the beforeCommit function.
+        releaseRemoteReader();
         super.cancel(needWaitCancelComplete);
     }
 
     @Override
     public void closeOrReleaseResources() {
-        // no need
+        // No-op: reader is shared across tasks; release on failure/cancel is 
done in onFail()/cancel().
+    }
+
+    /**
+     * Best-effort: stop this job's reader on {@link #runningBackendId} so a 
reschedule to another
+     * backend never leaves two readers competing for the same source (e.g. 
one PG replication slot,
+     * which is kept, not dropped). Failures are swallowed and must not block 
rescheduling.
+     */
+    public void releaseRemoteReader() {
+        if (runningBackendId <= 0) {
+            return;
+        }
+        Backend backend = 
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+        if (backend == null) {
+            log.info("Skip releasing remote reader: backend {} not found, job 
{} task {}",
+                    runningBackendId, getJobId(), getTaskId());
+            return;
+        }
+        try {
+            JobBaseConfig releaseParams = new JobBaseConfig(
+                    String.valueOf(getJobId()), dataSourceType.name(), 
sourceProperties, getFrontendAddress());
+            InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
+                    .setApi("/api/releaseReader/" + getTaskId())
+                    .setParams(new Gson().toJson(releaseParams)).build();
+            TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
+            // Fire-and-forget: this runs under the job write lock, so never 
block on the result.
+            BackendServiceProxy.getInstance()
+                    .requestCdcClient(address, request, 
Config.streaming_cdc_light_rpc_timeout_sec);
+            log.info("Sent release reader request to backend {}:{} for job {} 
task {}",

Review Comment:
   This is still only fire-and-forget, so the old backend may keep its CDC 
engine running after FE has already continued failure/cancel handling. For 
example, `processTimeoutTasks()` calls `runningMultiTask.onFail()`, this method 
enqueues `/api/releaseReader/{taskId}` and returns without `future.get()`, then 
`super.onFail()` pauses the job and auto-resume/manual resume can schedule a 
new task on another backend before the old backend has actually executed 
`reader.release()`/`finishSplitRecords()`. That leaves the previous reader and 
the replacement reader competing for the same source/PG replication slot, which 
is exactly what this method's contract says it prevents. This is distinct from 
the existing stale-release ownership threads: here the release belongs to the 
correct task, but FE does not wait for that valid release to complete before 
allowing the next task lifecycle to proceed. Please make the handoff 
synchronous enough for correctness, or add another mechanism that prevents a
  replacement task from starting until the owning reader has been stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to