github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3395273092


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -591,7 +602,21 @@ public void writeRecords(WriteRecordRequest 
writeRecordRequest) throws Exception
                     writeRecordRequest.getTaskId());
 
         } finally {
-            cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+            stillOwner =
+                    Env.getCurrentEnv()
+                            .isOwner(writeRecordRequest.getJobId(), 
writeRecordRequest.getTaskId());
+            // A displaced task must not touch the reader (finishSplitRecords 
would kill the
+            // successor's fetcher) nor commit anything.
+            if (stillOwner) {
+                cleanupReaderResources(sourceReader, 
writeRecordRequest.getJobId(), readResult);
+            }
+        }
+        if (!stillOwner) {
+            LOG.info(

Review Comment:
   This early return leaves the job-scoped `DorisBatchStreamLoad` in 
`batchStreamLoadMap` untouched. The path is reachable after a task has already 
called `writeRecord()` and then loses ownership because FE 
released/rebuilt/rebound the reader; in that case the old task skips 
`forceFlush()` and `commitOffset()`, but its rows can still remain in 
`bufferMap`/`flushQueue`. The next task reuses the same loader in 
`getOrCreateBatchStreamLoad()` and only clears `LoadStatistic`, so it can flush 
the previous task's uncommitted rows under the new task while FE never advanced 
the old offset. Please discard or isolate the stream-load buffers when skipping 
commit, without closing a successor task's active loader.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -646,6 +657,18 @@ offsetProvider, getConvertedSourceProperties(), targetDb, 
targetProperties, jobP
                 getCreateUser(), cloudCluster);
     }
 
+    // Binlog phase: prefer the bound BE in selectBackend; rebind + persist on 
change.
+    public Backend resolveBoundBackend() throws JobException {
+        Backend selected = StreamingJobUtils.selectBackend(cloudCluster, 
boundBackendId);

Review Comment:
   When the preferred BE is no longer load-available, this rebinds the job to a 
new BE but does not release the live reader on the old `boundBackendId`. 
`isLoadAvailable()` can be false while the BE process is still alive, and the 
binlog path intentionally keeps the old cdc_client reader open across tasks. 
Dispatching the next binlog task to the new BE can therefore create a second 
reader for the same source/PG replication slot or duplicate MySQL binlog 
consumption until the idle reaper eventually runs. Please cleanly release the 
previous bound BE's reader before persisting the new binding, or prevent rebind 
until that reader is known stopped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to