This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 2375fb414be branch-4.0: [fix](streaming-job) Fix PG replication slot
leak when streaming task is cancelled during pause/resume #62010 (#62736)
2375fb414be is described below
commit 2375fb414be2e9941127e97a9ad67c17a1664afb
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat May 9 16:20:48 2026 +0800
branch-4.0: [fix](streaming-job) Fix PG replication slot leak when
streaming task is cancelled during pause/resume #62010 (#62736)
Cherry-picked from #62010
Co-authored-by: wudi <[email protected]>
---
.../cdcclient/source/reader/JdbcIncrementalSourceReader.java | 12 ++++++++++++
.../cdcclient/source/reader/mysql/MySqlSourceReader.java | 12 ++++++++++++
2 files changed, 24 insertions(+)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 36111d0fbf4..c43595826e3 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -336,6 +336,18 @@ public abstract class JdbcIncrementalSourceReader
implements SourceReader {
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {
Tuple2<SourceSplitBase, Boolean> splitFlag =
createStreamSplit(offsetMeta, baseReq);
this.streamSplit = splitFlag.f0.asStreamSplit();
+
+ // Close previous stream reader to release resources (e.g. PG
replication slot)
+ // before creating a new one. This prevents connection leaks when a
cancelled
+ // task's reader is still active while a new task arrives.
+ if (this.streamReader != null) {
+ LOG.info(
+ "Closing previous stream reader before creating new one
for job {}",
+ baseReq.getJobId());
+ closeReaderInternal(this.streamReader);
+ this.streamReader = null;
+ }
+
this.streamReader = getBinlogSplitReader(baseReq);
LOG.info("Prepare stream split: {}", this.streamSplit.toString());
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 83c9b349e4e..5675b3dd835 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -341,6 +341,18 @@ public class MySqlSourceReader implements SourceReader {
Map<String, Object> offsetMeta, JobBaseRecordRequest baseReq)
throws Exception {
Tuple2<MySqlSplit, Boolean> splitFlag = createBinlogSplit(offsetMeta,
baseReq);
this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0;
+
+ // Close previous binlog reader to release resources before creating a
new one.
+ // This prevents connection leaks when a cancelled task's reader is
still active
+ // while a new task arrives.
+ if (this.binlogReader != null) {
+ LOG.info(
+ "Closing previous binlog reader before creating new one
for job {}",
+ baseReq.getJobId());
+ this.binlogReader.close();
+ this.binlogReader = null;
+ }
+
this.binlogReader = getBinlogSplitReader(baseReq);
LOG.info("Prepare binlog split: {}", this.binlogSplit.toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]