This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c49959f6 [FLINK-35524][cdc-base] Clear connections pools when reader 
exist. (#3388)
0c49959f6 is described below

commit 0c49959f678d6ea4269f78b3080f35cbf1f58601
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Aug 6 15:24:39 2024 +0800

    [FLINK-35524][cdc-base] Clear connections pools when reader exist. (#3388)
---
 .../cdc/connectors/base/dialect/DataSourceDialect.java      |  7 ++++++-
 .../cdc/connectors/base/dialect/JdbcDataSourceDialect.java  |  6 ++++++
 .../base/relational/connection/JdbcConnectionPools.java     |  9 +++++++++
 .../base/source/assigner/HybridSplitAssigner.java           |  3 ++-
 .../base/source/assigner/SnapshotSplitAssigner.java         |  5 ++++-
 .../cdc/connectors/base/source/assigner/SplitAssigner.java  |  6 ++++--
 .../base/source/assigner/StreamSplitAssigner.java           |  5 ++++-
 .../base/source/enumerator/IncrementalSourceEnumerator.java |  3 ++-
 .../mysql/source/assigners/MySqlBinlogSplitAssigner.java    |  7 ++++++-
 .../mysql/source/assigners/MySqlSnapshotSplitAssigner.java  |  3 +++
 .../mysql/source/assigners/MySqlSplitAssigner.java          |  6 ++++--
 .../mysql/source/connection/JdbcConnectionPools.java        |  8 ++++++++
 .../mysql/source/enumerator/MySqlSourceEnumerator.java      |  3 ++-
 .../source/assigners/MySqlBinlogSplitAssignerTest.java      | 13 +++++++------
 14 files changed, 67 insertions(+), 17 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
index 8d5e810bf..ce5b34335 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/DataSourceDialect.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +39,7 @@ import java.util.Map;
  * @param <C> The source config of data source.
  */
 @Experimental
-public interface DataSourceDialect<C extends SourceConfig> extends 
Serializable {
+public interface DataSourceDialect<C extends SourceConfig> extends 
Serializable, Closeable {
 
     /** Get the name of dialect. */
     String getName();
@@ -78,4 +80,7 @@ public interface DataSourceDialect<C extends SourceConfig> 
extends Serializable
 
     /** Check if the tableId is included in SourceConfig. */
     boolean isIncludeDataCollection(C sourceConfig, TableId tableId);
+
+    @Override
+    default void close() throws IOException {}
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
index 557c187d2..136e73dda 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/dialect/JdbcDataSourceDialect.java
@@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.Experimental;
 import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
 import org.apache.flink.cdc.connectors.base.config.SourceConfig;
 import 
org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPoolFactory;
+import 
org.apache.flink.cdc.connectors.base.relational.connection.JdbcConnectionPools;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
 
@@ -28,6 +29,7 @@ import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 import io.debezium.relational.history.TableChanges.TableChange;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -59,4 +61,8 @@ public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfi
 
     @Override
     FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase 
sourceSplitBase);
+
+    default void close() throws IOException {
+        JdbcConnectionPools.getInstance(getPooledDataSourceFactory()).clear();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
index 4c4df6f1b..625cb8db9 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/connection/JdbcConnectionPools.java
@@ -25,6 +25,7 @@ import com.zaxxer.hikari.HikariDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -83,4 +84,12 @@ public class JdbcConnectionPools implements 
ConnectionPools<HikariDataSource, Jd
         }
         return jdbcConnectionPoolFactory.getJdbcUrl(sourceConfig);
     }
+
+    public void clear() throws IOException {
+        synchronized (pools) {
+            pools.values().stream().forEach(HikariDataSource::close);
+            pools.clear();
+            POOL_FACTORY_MAP.clear();
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
index ae715b279..6764daabd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java
@@ -32,6 +32,7 @@ import io.debezium.relational.TableId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -218,7 +219,7 @@ public class HybridSplitAssigner<C extends SourceConfig> 
implements SplitAssigne
     }
 
     @Override
-    public void close() {
+    public void close() throws IOException {
         snapshotSplitAssigner.close();
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index d424e89b7..cd0e77200 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -398,7 +399,9 @@ public class SnapshotSplitAssigner<C extends SourceConfig> 
implements SplitAssig
     }
 
     @Override
-    public void close() {}
+    public void close() throws IOException {
+        dialect.close();
+    }
 
     @Override
     public boolean noMoreSplits() {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
index 4dd9e4904..0740ff402 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SplitAssigner.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +36,7 @@ import java.util.Optional;
  * determines split processing order.
  */
 @Experimental
-public interface SplitAssigner {
+public interface SplitAssigner extends Closeable {
 
     /**
      * Called to open the assigner to acquire any resources, like threads or 
network connections.
@@ -120,5 +122,5 @@ public interface SplitAssigner {
      * Called to close the assigner, in case it holds on to any resources, 
like threads or network
      * connections.
      */
-    void close();
+    void close() throws IOException;
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
index 74bd04fca..1e7b2fa1e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/StreamSplitAssigner.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSp
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
 import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -133,7 +134,9 @@ public class StreamSplitAssigner implements SplitAssigner {
     }
 
     @Override
-    public void close() {}
+    public void close() throws IOException {
+        dialect.close();
+    }
 
     // 
------------------------------------------------------------------------------------------
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
index 05dcc6bd2..057833484 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -187,7 +188,7 @@ public class IncrementalSourceEnumerator
     }
 
     @Override
-    public void close() {
+    public void close() throws IOException {
         LOG.info("Closing enumerator...");
         splitAssigner.close();
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
index dd80e23b8..aa35e47a6 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssigner.java
@@ -20,12 +20,14 @@ package 
org.apache.flink.cdc.connectors.mysql.source.assigners;
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.BinlogPendingSplitsState;
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
+import 
org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
 import org.apache.flink.util.CollectionUtil;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -121,7 +123,10 @@ public class MySqlBinlogSplitAssigner implements 
MySqlSplitAssigner {
     public void onBinlogSplitUpdated() {}
 
     @Override
-    public void close() {}
+    public void close() throws IOException {
+        // clear jdbc connection pools
+        JdbcConnectionPools.getInstance().clear();
+    }
 
     // 
------------------------------------------------------------------------------------------
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index e2dd426a6..e209921b5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitte
 import 
org.apache.flink.cdc.connectors.mysql.source.assigners.state.SnapshotPendingSplitsState;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
 import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import 
org.apache.flink.cdc.connectors.mysql.source.connection.JdbcConnectionPools;
 import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
@@ -489,6 +490,8 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
         if (chunkSplitter != null) {
             try {
                 chunkSplitter.close();
+                // clear jdbc connection pools
+                JdbcConnectionPools.getInstance().clear();
             } catch (Exception e) {
                 LOG.warn("Fail to close the chunk splitter.");
             }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
index 2673d9d7e..ac5f1c0c0 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
 import 
org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
 import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -34,7 +36,7 @@ import java.util.Optional;
  * determines split processing order.
  */
 @Internal
-public interface MySqlSplitAssigner {
+public interface MySqlSplitAssigner extends Closeable {
 
     /**
      * Called to open the assigner to acquire any resources, like threads or 
network connections.
@@ -120,5 +122,5 @@ public interface MySqlSplitAssigner {
      * Called to close the assigner, in case it holds on to any resources, 
like threads or network
      * connections.
      */
-    void close();
+    void close() throws IOException;
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
index 4396156fe..9505a559a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/connection/JdbcConnectionPools.java
@@ -23,6 +23,7 @@ import com.zaxxer.hikari.HikariDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -51,4 +52,11 @@ public class JdbcConnectionPools implements ConnectionPools {
             return pools.get(poolId);
         }
     }
+
+    public void clear() throws IOException {
+        synchronized (pools) {
+            pools.values().stream().forEach(HikariDataSource::close);
+            pools.clear();
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
index 50d7607b6..3ab2ab509 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -190,7 +191,7 @@ public class MySqlSourceEnumerator implements 
SplitEnumerator<MySqlSplit, Pendin
     }
 
     @Override
-    public void close() {
+    public void close() throws IOException {
         LOG.info("Closing enumerator...");
         splitAssigner.close();
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
index 536dfbee5..9822f1afd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlBinlogSplitAssignerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.time.ZoneId;
 import java.util.Optional;
 
@@ -40,36 +41,36 @@ import static org.junit.Assert.assertTrue;
 public class MySqlBinlogSplitAssignerTest {
 
     @Test
-    public void testStartFromEarliest() {
+    public void testStartFromEarliest() throws IOException {
         checkAssignedBinlogOffset(StartupOptions.earliest(), 
BinlogOffset.ofEarliest());
     }
 
     @Test
-    public void testStartFromLatestOffset() {
+    public void testStartFromLatestOffset() throws IOException {
         checkAssignedBinlogOffset(StartupOptions.latest(), 
BinlogOffset.ofLatest());
     }
 
     @Test
-    public void testStartFromTimestamp() {
+    public void testStartFromTimestamp() throws IOException {
         checkAssignedBinlogOffset(
                 StartupOptions.timestamp(15213000L), 
BinlogOffset.ofTimestampSec(15213L));
     }
 
     @Test
-    public void testStartFromBinlogFile() {
+    public void testStartFromBinlogFile() throws IOException {
         checkAssignedBinlogOffset(
                 StartupOptions.specificOffset("foo-file", 15213),
                 BinlogOffset.ofBinlogFilePosition("foo-file", 15213L));
     }
 
     @Test
-    public void testStartFromGtidSet() {
+    public void testStartFromGtidSet() throws IOException {
         checkAssignedBinlogOffset(
                 StartupOptions.specificOffset("foo-gtid"), 
BinlogOffset.ofGtidSet("foo-gtid"));
     }
 
     private void checkAssignedBinlogOffset(
-            StartupOptions startupOptions, BinlogOffset expectedOffset) {
+            StartupOptions startupOptions, BinlogOffset expectedOffset) throws 
IOException {
         // Set starting from the given option
         MySqlBinlogSplitAssigner assigner = new 
MySqlBinlogSplitAssigner(getConfig(startupOptions));
         // Get splits from assigner

Reply via email to