github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3432548995


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -129,12 +130,25 @@ public Object compareOffset(@RequestBody 
CompareOffsetRequest compareOffsetReque
     /** Close job */
     @RequestMapping(path = "/api/close", method = RequestMethod.POST)
     public Object close(@RequestBody JobBaseConfig jobConfig) {
-        LOG.info("Closing job {}", jobConfig.getJobId());
+        String jobId = jobConfig.getJobId();
+        LOG.info("Closing job {}", jobId);
         Env env = Env.getCurrentEnv();
-        SourceReader reader = env.getReader(jobConfig);
-        reader.close(jobConfig);
-        env.close(jobConfig.getJobId());
-        pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
+        // Don't rebuild a reader to close it; an absent reader (owner BE 
gone) just needs its slot
+        // dropped.
+        SourceReader reader = env.getReaderIfPresent(jobId);
+        try {
+            if (reader != null) {
+                reader.release(jobConfig);
+            }
+            SourceReader dropper = reader != null ? reader : 
env.getMetaReader(jobConfig);
+            env.releaseSourceResourcesOrRetry(dropper, jobConfig);
+        } catch (Exception ex) {
+            LOG.warn("Close job {} teardown failed: {}", jobId, 
ex.getMessage());
+            env.scheduleSlotDrop(jobConfig);
+        } finally {
+            env.close(jobId);

Review Comment:
   This removes the local context even when `reader.release(jobConfig)` failed 
above. A concrete path is `/api/close` on the BE that owns a PG reader: 
`release()` enters `finishSplitRecords()`, a fetcher close throws before the 
stream reader is stopped, the catch only schedules a slot drop, and this 
finally block untracks the still-live reader. The background retry uses a new 
throwaway reader, so the slot remains active in this same cdc_client and every 
drop attempt fails until the retry window gives up, after FE has already 
deleted the job metadata. Please keep the context, or otherwise 
deterministically stop/close the reader, when local release fails and only 
untrack after the engine is known stopped.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +263,143 @@ public void close(String jobId) {
         }
     }
 
+    /** Liveness evidence (FE heartbeat or active poll): keep this job's 
reader alive. */
+    public void keepAlive(String jobId) {
+        JobContext context = jobContexts.get(jobId);
+        if (context != null) {
+            context.lastAliveTime = System.currentTimeMillis();
+        }
+    }
+
+    // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 = 
untracked (e.g. TVF),
+    // skip.
+    private void releaseIdleReaders() {
+        long now = System.currentTimeMillis();
+        for (String jobId : jobContexts.keySet()) {
+            Lock lock = jobLocks.get(jobId);
+            if (lock == null || !lock.tryLock()) {
+                continue;
+            }
+            SourceReader toRelease = null;
+            JobBaseConfig releaseConfig = null;
+            try {
+                JobContext context = jobContexts.get(jobId);
+                if (context == null || context.lastAliveTime <= 0 || 
context.maxIntervalMs <= 0) {
+                    continue;
+                }
+                long timeout =
+                        Math.max(
+                                (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+                                        * context.maxIntervalMs,
+                                Constants.IDLE_READER_MIN_TIMEOUT_MS);
+                if (now - context.lastAliveTime <= timeout) {
+                    continue;
+                }
+                LOG.info(
+                        "Releasing idle reader for job {}, idle {} ms, keep 
slot",
+                        jobId,
+                        now - context.lastAliveTime);
+                jobContexts.remove(jobId);
+                toRelease = context.reader;
+                releaseConfig = context.jobConfig;
+            } finally {
+                lock.unlock();
+            }
+            // Release outside the lock so blocking IO never stalls 
getReaderAndClaim/detach.
+            if (toRelease != null && releaseConfig != null) {
+                try {
+                    toRelease.release(releaseConfig);
+                } catch (Exception ex) {
+                    LOG.warn("Failed to release idle reader for job {}", 
jobId, ex);
+                }
+            }
+        }
+    }
+
+    // Each chore is guarded independently: one failing must not skip the 
other, and an uncaught
+    // throwable here would silently cancel the whole periodic task.
+    private void runBackgroundCleanup() {
+        try {
+            releaseIdleReaders();
+        } catch (Exception e) {
+            LOG.warn("releaseIdleReaders failed", e);
+        }
+        try {
+            retryPendingSlotDrops();
+        } catch (Exception e) {
+            LOG.warn("retryPendingSlotDrops failed", e);
+        }
+    }
+
+    /**
+     * Run source-side cleanup; if incomplete (e.g. slot still held by a dead 
BE), retry in
+     * background.
+     */
+    public void releaseSourceResourcesOrRetry(SourceReader reader, 
JobBaseConfig jobConfig) {
+        if (!releaseSourceResources(reader, jobConfig)) {
+            scheduleSlotDrop(jobConfig);
+        }
+    }
+
+    public void scheduleSlotDrop(JobBaseConfig jobConfig) {
+        long deadline = System.currentTimeMillis() + 
Constants.SLOT_DROP_RETRY_WINDOW_MS;
+        pendingSlotDrops.putIfAbsent(jobConfig.getJobId(), new 
SlotDropTask(jobConfig, deadline));

Review Comment:
   `pendingSlotDrops` is the only retry state after FE has already removed the 
job in `cleanMeta`, but it is only in this cdc_client heap. If DROP JOB routes 
`/api/close` to an alive BE while the Doris-owned PG slot is still active on a 
dead BE, `releaseSourceResources()` returns false and this schedules retry; if 
this cdc_client/BE restarts before PG frees the stale walsender, the map is 
empty on restart and no FE job metadata remains to issue another close, leaving 
the slot/publication behind permanently. Please make the cleanup retry 
FE-driven/durable, or fail/retry DROP JOB until the source resources are 
actually gone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to