github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3432548995


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -129,12 +130,25 @@ public Object compareOffset(@RequestBody 
CompareOffsetRequest compareOffsetReque
     /** Close job */
     @RequestMapping(path = "/api/close", method = RequestMethod.POST)
     public Object close(@RequestBody JobBaseConfig jobConfig) {
-        LOG.info("Closing job {}", jobConfig.getJobId());
+        String jobId = jobConfig.getJobId();
+        LOG.info("Closing job {}", jobId);
         Env env = Env.getCurrentEnv();
-        SourceReader reader = env.getReader(jobConfig);
-        reader.close(jobConfig);
-        env.close(jobConfig.getJobId());
-        pipelineCoordinator.closeJobStreamLoad(jobConfig.getJobId());
+        // Don't rebuild a reader to close it; an absent reader (owner BE 
gone) just needs its slot
+        // dropped.
+        SourceReader reader = env.getReaderIfPresent(jobId);
+        try {
+            if (reader != null) {
+                reader.release(jobConfig);
+            }
+            SourceReader dropper = reader != null ? reader : 
env.getMetaReader(jobConfig);
+            env.releaseSourceResourcesOrRetry(dropper, jobConfig);
+        } catch (Exception ex) {
+            LOG.warn("Close job {} teardown failed: {}", jobId, 
ex.getMessage());
+            env.scheduleSlotDrop(jobConfig);
+        } finally {
+            env.close(jobId);

Review Comment:
   This removes the local context even when  failed above. A concrete path is  
on the BE that owns a PG reader:  enters , a fetcher close throws before the 
stream reader is stopped, the catch only schedules a slot drop, and this 
finally block untracks the still-live reader. The background retry uses a new 
throwaway reader, so the slot remains active in this same cdc_client and every 
drop attempt fails until the retry window gives up, after FE has already 
deleted the job metadata. Please keep the context, or otherwise 
deterministically stop/close the reader, when local release fails and only 
untrack after the engine is known stopped.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +263,143 @@ public void close(String jobId) {
         }
     }
 
+    /** Liveness evidence (FE heartbeat or active poll): keep this job's 
reader alive. */
+    public void keepAlive(String jobId) {
+        JobContext context = jobContexts.get(jobId);
+        if (context != null) {
+            context.lastAliveTime = System.currentTimeMillis();
+        }
+    }
+
+    // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 = 
untracked (e.g. TVF),
+    // skip.
+    private void releaseIdleReaders() {
+        long now = System.currentTimeMillis();
+        for (String jobId : jobContexts.keySet()) {
+            Lock lock = jobLocks.get(jobId);
+            if (lock == null || !lock.tryLock()) {
+                continue;
+            }
+            SourceReader toRelease = null;
+            JobBaseConfig releaseConfig = null;
+            try {
+                JobContext context = jobContexts.get(jobId);
+                if (context == null || context.lastAliveTime <= 0 || 
context.maxIntervalMs <= 0) {
+                    continue;
+                }
+                long timeout =
+                        Math.max(
+                                (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+                                        * context.maxIntervalMs,
+                                Constants.IDLE_READER_MIN_TIMEOUT_MS);
+                if (now - context.lastAliveTime <= timeout) {
+                    continue;
+                }
+                LOG.info(
+                        "Releasing idle reader for job {}, idle {} ms, keep 
slot",
+                        jobId,
+                        now - context.lastAliveTime);
+                jobContexts.remove(jobId);
+                toRelease = context.reader;
+                releaseConfig = context.jobConfig;
+            } finally {
+                lock.unlock();
+            }
+            // Release outside the lock so blocking IO never stalls 
getReaderAndClaim/detach.
+            if (toRelease != null && releaseConfig != null) {
+                try {
+                    toRelease.release(releaseConfig);
+                } catch (Exception ex) {
+                    LOG.warn("Failed to release idle reader for job {}", 
jobId, ex);
+                }
+            }
+        }
+    }
+
+    // Each chore is guarded independently: one failing must not skip the 
other, and an uncaught
+    // throwable here would silently cancel the whole periodic task.
+    private void runBackgroundCleanup() {
+        try {
+            releaseIdleReaders();
+        } catch (Exception e) {
+            LOG.warn("releaseIdleReaders failed", e);
+        }
+        try {
+            retryPendingSlotDrops();
+        } catch (Exception e) {
+            LOG.warn("retryPendingSlotDrops failed", e);
+        }
+    }
+
+    /**
+     * Run source-side cleanup; if incomplete (e.g. slot still held by a dead 
BE), retry in
+     * background.
+     */
+    public void releaseSourceResourcesOrRetry(SourceReader reader, 
JobBaseConfig jobConfig) {
+        if (!releaseSourceResources(reader, jobConfig)) {
+            scheduleSlotDrop(jobConfig);
+        }
+    }
+
+    public void scheduleSlotDrop(JobBaseConfig jobConfig) {
+        long deadline = System.currentTimeMillis() + 
Constants.SLOT_DROP_RETRY_WINDOW_MS;
+        pendingSlotDrops.putIfAbsent(jobConfig.getJobId(), new 
SlotDropTask(jobConfig, deadline));

Review Comment:
    is the only retry state after FE has already removed the job in , but it is 
only in this cdc_client heap. If DROP JOB routes  to an alive BE while the 
Doris-owned PG slot is still active on a dead BE,  returns false and this 
schedules retry; if this cdc_client/BE restarts before PG frees the stale 
walsender, the map is empty on restart and no FE job metadata remains to issue 
another close, leaving the slot/publication behind permanently. Please make the 
cleanup retry FE-driven/durable, or fail/retry DROP JOB until the source 
resources are actually gone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to