This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3287b1d85 [Bugfix][connector-cdc-mysql] Fix listener not released when
BinlogClient reuse (#5011)
3287b1d85 is described below
commit 3287b1d8521b9fda08fa4774cf958071031ac313
Author: happyboy1024 <[email protected]>
AuthorDate: Tue Jul 11 16:28:01 2023 +0800
[Bugfix][connector-cdc-mysql] Fix listener not released when BinlogClient
reuse (#5011)
---
.../reader/fetch/MySqlSourceFetchTaskContext.java | 18 +++++++++++++++++-
1 file changed, 17 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index c6aebc8aa..0b6ea40ea 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -18,6 +18,7 @@
package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
@@ -70,6 +71,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
@@ -326,13 +328,27 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
MySqlDatabaseSchema schema,
BinaryLogClient reusedBinaryLogClient) {
super(config, schema);
- this.reusedBinaryLogClient = reusedBinaryLogClient;
+ this.reusedBinaryLogClient =
resetBinaryLogClient(reusedBinaryLogClient);
}
@Override
public BinaryLogClient getBinaryLogClient() {
return reusedBinaryLogClient;
}
+
+ /** reset the listener of binaryLogClient before fetch task start. */
+ private BinaryLogClient resetBinaryLogClient(BinaryLogClient
binaryLogClient) {
+ Optional<Object> eventListenersField =
+ ReflectionUtils.getField(
+ binaryLogClient, BinaryLogClient.class,
"eventListeners");
+ eventListenersField.ifPresent(o ->
((List<BinaryLogClient.EventListener>) o).clear());
+ Optional<Object> lifecycleListeners =
+ ReflectionUtils.getField(
+ binaryLogClient, BinaryLogClient.class,
"lifecycleListeners");
+ lifecycleListeners.ifPresent(
+ o -> ((List<BinaryLogClient.LifecycleListener>)
o).clear());
+ return binaryLogClient;
+ }
}
/** Copied from debezium for accessing here. */