github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3401480106


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +244,68 @@ public void close(String jobId) {
         }
     }
 
+    /** Liveness evidence (FE heartbeat or active poll): keep this job's 
reader alive. */
+    public void keepAlive(String jobId) {
+        JobContext context = jobContexts.get(jobId);
+        if (context != null) {
+            context.lastAliveTime = System.currentTimeMillis();
+        }
+    }
+
+    // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 = 
untracked (e.g. TVF),
+    // skip.
+    private void releaseIdleReaders() {
+        long now = System.currentTimeMillis();
+        for (String jobId : jobContexts.keySet()) {
+            Lock lock = jobLocks.get(jobId);
+            if (lock == null || !lock.tryLock()) {
+                continue;
+            }
+            SourceReader toRelease = null;
+            JobBaseConfig releaseConfig = null;
+            try {
+                JobContext context = jobContexts.get(jobId);
+                if (context == null || context.lastAliveTime <= 0 || 
context.maxIntervalMs <= 0) {
+                    continue;
+                }
+                long timeout =
+                        Math.max(
+                                (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+                                        * context.maxIntervalMs,
+                                Constants.IDLE_READER_MIN_TIMEOUT_MS);
+                if (now - context.lastAliveTime <= timeout) {
+                    continue;
+                }
+                LOG.info(
+                        "Releasing idle reader for job {}, idle {} ms, keep 
slot",
+                        jobId,
+                        now - context.lastAliveTime);
+                jobContexts.remove(jobId);
+                toRelease = context.reader;
+                releaseConfig = context.jobConfig;
+            } finally {
+                lock.unlock();
+            }
+            // Release outside the lock so blocking IO never stalls 
getReaderAndClaim/detach.
+            if (toRelease != null && releaseConfig != null) {
+                try {
+                    toRelease.release(releaseConfig);
+                } catch (Exception ex) {

Review Comment:
   The idle reaper has the same untracking-before-release problem on a 
different path. It removes `jobContexts` before the keep-slot release succeeds; 
if `toRelease.release()` fails, the live reader is no longer reachable by 
`isOwner()`, `detachReaderIfOwner()`, or the next reaper pass, and the next 
`/api/writeRecords` will create another reader for the same source. Please only 
remove the context after release succeeds, or keep a tombstone/failed-release 
state that prevents a new claim from creating a second live reader.



##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -1002,7 +1010,8 @@ private void initSourceReader() throws JobException {
     public void cleanMeta(Long jobId) throws JobException {
         // clean meta table
         StreamingJobUtils.deleteJobMeta(jobId);
-        Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
+        // Route to the bound BE so close tears down its live reader; 
dead/unbound falls back to random.
+        Backend backend = StreamingJobUtils.selectBackend(cloudCluster, 
boundBackendId);
         JobBaseConfig requestParams =

Review Comment:
   For final cleanup this fallback can send `/api/close` to a random 
load-available BE instead of the BE that owns the live reader. 
`selectBackend(cloudCluster, boundBackendId)` deliberately ignores the bound BE 
when `isLoadAvailable()` is false, but that BE's cdc_client process can still 
be alive and holding the reused PG slot/MySQL reader. On DROP/cleanMeta, 
closing a random BE is a no-op for that live reader, and the idle reaper only 
calls `release()` (keep slot), so source-side state can be leaked after the job 
metadata is deleted. This is a separate instance from the rebind thread because 
this is final job cleanup; please address the remembered bound BE directly when 
it exists/reachable, or fail/retry cleanup instead of silently falling back.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -90,20 +107,55 @@ public SourceReader getReaderAndClaim(JobBaseConfig 
jobConfig, String taskId) {
         DataSource ds = resolveDataSource(jobConfig.getDataSource());
         String jobId = jobConfig.getJobId();
         Lock lock = jobLocks.computeIfAbsent(jobId, k -> new ReentrantLock());
+        SourceReader staleReader = null;
+        JobBaseConfig staleConfig = null;
+        SourceReader reader;
         lock.lock();
         try {
             JobContext context = jobContexts.get(jobId);
+            if (context != null
+                    && jobConfig instanceof WriteRecordRequest
+                    && ((WriteRecordRequest) jobConfig).isRebuildReader()) {
+                // FE declared the previous task abnormal: swap in a fresh 
reader instance so the
+                // old task's thread can never reach the new fetcher.
+                LOG.info(
+                        "Rebuild reader for job {} on FE request, discard 
current instance", jobId);
+                jobContexts.remove(jobId);
+                staleReader = context.reader;
+                staleConfig = context.jobConfig != null ? context.jobConfig : 
jobConfig;
+                context = null;
+            }
             if (context == null) {
                 LOG.info("Creating new reader for job {}, dataSource {}", 
jobId, ds);
                 context = new JobContext(jobId, ds, jobConfig.getConfig());
                 context.initializeReader();
                 jobContexts.put(jobId, context);
             }
             context.ownerTaskId = taskId;
-            return context.getReader(ds);
+            context.jobConfig = jobConfig;
+            if (jobConfig instanceof WriteRecordRequest) {
+                context.maxIntervalMs = ((WriteRecordRequest) 
jobConfig).getMaxInterval() * 1000;
+            }
+            context.lastAliveTime = System.currentTimeMillis();
+            reader = context.getReader(ds);
         } finally {
             lock.unlock();
         }
+        if (staleReader != null) {
+            // free the engine/slot connection before the caller submits the 
new fetcher
+            try {
+                staleReader.release(staleConfig);
+            } catch (Exception ex) {

Review Comment:
   This rebuild path removes the old context and installs a new reader before 
`staleReader.release()` is known to have stopped the previous engine. If 
`release()` throws (for example from `finishSplitRecords()` / reader close) or 
leaves the old Debezium fetcher alive, the exception is only logged and this 
method still returns the fresh reader, so the caller can submit a second binlog 
stream for the same job/slot while the stale task continues polling. This is 
distinct from the existing ownership-gate threads: those prevent stale commits, 
but they do not prevent duplicate source consumption. Please make the rebuild 
fail/retry, or keep the old reader tracked, until the old engine is 
deterministically stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to