github-actions[bot] commented on code in PR #64013:
URL: https://github.com/apache/doris/pull/64013#discussion_r3340684688
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -306,28 +308,65 @@ public void successCallback(CommitOffsetRequest
offsetRequest) throws JobExcepti
@Override
protected void onFail(String errMsg) throws JobException {
+ // Release this task's reader before reschedule so it stops competing
for the shared slot.
+ releaseRemoteReader();
super.onFail(errMsg);
}
@Override
public void cancel(boolean needWaitCancelComplete) {
// No manual cancellation is required; the task ID will be checked for
consistency in the beforeCommit function.
+ releaseRemoteReader();
super.cancel(needWaitCancelComplete);
}
@Override
public void closeOrReleaseResources() {
- // no need
+ // No-op: reader is shared across tasks; release on failure/cancel is
done in onFail()/cancel().
+ }
+
+ /**
+ * Best-effort: stop this job's reader on {@link #runningBackendId} so a
reschedule to another
+ * backend never leaves two readers competing for the same source (e.g.
one PG replication slot,
+ * which is kept, not dropped). Failures are swallowed and must not block
rescheduling.
+ */
+ public void releaseRemoteReader() {
+ if (runningBackendId <= 0) {
+ return;
+ }
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+ if (backend == null) {
+ log.info("Skip releasing remote reader: backend {} not found, job
{} task {}",
+ runningBackendId, getJobId(), getTaskId());
+ return;
+ }
+ try {
+ JobBaseConfig releaseParams = new JobBaseConfig(
+ String.valueOf(getJobId()), dataSourceType.name(),
sourceProperties, getFrontendAddress());
+ InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
+ .setApi("/api/releaseReader/" + getTaskId())
+ .setParams(new Gson().toJson(releaseParams)).build();
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
+ // Fire-and-forget: this runs under the job write lock, so never
block on the result.
+ BackendServiceProxy.getInstance()
+ .requestCdcClient(address, request,
Config.streaming_cdc_light_rpc_timeout_sec);
+ log.info("Sent release reader request to backend {}:{} for job {}
task {}",
Review Comment:
This is still only fire-and-forget, so the old backend may keep its CDC
engine running after FE has already continued failure/cancel handling. For
example, `processTimeoutTasks()` calls `runningMultiTask.onFail()`, this method
enqueues `/api/releaseReader/{taskId}` and returns without `future.get()`, then
`super.onFail()` pauses the job and auto-resume/manual resume can schedule a
new task on another backend before the old backend has actually executed
`reader.release()`/`finishSplitRecords()`. That leaves the previous reader and
the replacement reader competing for the same source/PG replication slot, which
is exactly what this method's contract says it prevents. This is distinct from
the existing stale-release ownership threads: here the release belongs to the
correct task, but FE does not wait for that valid release to complete before
allowing the next task lifecycle to proceed. Please make the handoff
synchronous enough for correctness, or add another mechanism that prevents a
replacement task from starting until the owning reader has been stopped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]