This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 2375fb414be branch-4.0: [fix](streaming-job) Fix PG replication slot 
leak when streaming task is cancelled during pause/resume  #62010 (#62736)
2375fb414be is described below

commit 2375fb414be2e9941127e97a9ad67c17a1664afb
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 9 16:20:48 2026 +0800

    branch-4.0: [fix](streaming-job) Fix PG replication slot leak when 
streaming task is cancelled during pause/resume  #62010 (#62736)
    
    Cherry-picked from #62010
    
    Co-authored-by: wudi <[email protected]>
---
 .../cdcclient/source/reader/JdbcIncrementalSourceReader.java | 12 ++++++++++++
 .../cdcclient/source/reader/mysql/MySqlSourceReader.java     | 12 ++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 36111d0fbf4..c43595826e3 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -336,6 +336,18 @@ public abstract class JdbcIncrementalSourceReader 
implements SourceReader {
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
         Tuple2<SourceSplitBase, Boolean> splitFlag = 
createStreamSplit(offsetMeta, baseReq);
         this.streamSplit = splitFlag.f0.asStreamSplit();
+
+        // Close previous stream reader to release resources (e.g. PG 
replication slot)
+        // before creating a new one. This prevents connection leaks when a 
cancelled
+        // task's reader is still active while a new task arrives.
+        if (this.streamReader != null) {
+            LOG.info(
+                    "Closing previous stream reader before creating new one 
for job {}",
+                    baseReq.getJobId());
+            closeReaderInternal(this.streamReader);
+            this.streamReader = null;
+        }
+
         this.streamReader = getBinlogSplitReader(baseReq);
 
         LOG.info("Prepare stream split: {}", this.streamSplit.toString());
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 83c9b349e4e..5675b3dd835 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -341,6 +341,18 @@ public class MySqlSourceReader implements SourceReader {
             Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq) 
throws Exception {
         Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta, 
baseReq);
         this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+
+        // Close previous binlog reader to release resources before creating a 
new one.
+        // This prevents connection leaks when a cancelled task's reader is 
still active
+        // while a new task arrives.
+        if (this.binlogReader != null) {
+            LOG.info(
+                    "Closing previous binlog reader before creating new one 
for job {}",
+                    baseReq.getJobId());
+            this.binlogReader.close();
+            this.binlogReader = null;
+        }
+
         this.binlogReader = getBinlogSplitReader(baseReq);
 
         LOG.info("Prepare binlog split: {}", this.binlogSplit.toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to