This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a2d2f2dda [Bugfix][zeta] Fix cdc connection does not close (#4922)
a2d2f2dda is described below

commit a2d2f2dda8895aa028ab63719aab913b09ce7c03
Author: ic4y <[email protected]>
AuthorDate: Fri Jul 7 14:17:40 2023 +0800

    [Bugfix][zeta] Fix cdc connection does not close (#4922)
---
 .../cdc/base/source/reader/IncrementalSourceSplitReader.java          | 3 ++-
 .../seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java | 3 ++-
 .../cdc/base/source/reader/external/IncrementalSourceScanFetcher.java | 3 ++-
 .../base/source/reader/external/IncrementalSourceStreamFetcher.java   | 3 ++-
 .../source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java   | 4 +---
 .../seatunnel/common/source/reader/SourceReaderOptions.java           | 2 +-
 .../connectors/seatunnel/common/source/reader/fetcher/FetchTask.java  | 4 ++--
 7 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
index c509db376..932b5f0e4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.reader;
 
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
 import 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
@@ -66,7 +67,7 @@ public class IncrementalSourceSplitReader<C extends 
SourceConfig>
         Iterator<SourceRecords> dataIt = null;
         try {
             dataIt = currentFetcher.pollSplitRecords();
-        } catch (InterruptedException e) {
+        } catch (InterruptedException | SeaTunnelException e) {
             log.warn("fetch data failed.", e);
             throw new IOException(e);
         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
index 3bf20b6bd..a818daa44 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.cdc.base.source.reader.external;
 
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
 import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 
@@ -35,7 +36,7 @@ public interface Fetcher<T, Split> {
      * Fetched records from data source. The method should return null when 
reaching the end of the
      * split, the empty {@link Iterator} will be returned if the data of split 
is on pulling.
      */
-    Iterator<T> pollSplitRecords() throws InterruptedException;
+    Iterator<T> pollSplitRecords() throws InterruptedException, 
SeaTunnelException;
 
     /** Return the current fetch task is finished or not. */
     boolean isFinished();
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index fc97d7d21..7a09ac6bc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -107,7 +107,8 @@ public class IncrementalSourceScanFetcher implements 
Fetcher<SourceRecords, Sour
     }
 
     @Override
-    public Iterator<SourceRecords> pollSplitRecords() throws 
InterruptedException {
+    public Iterator<SourceRecords> pollSplitRecords()
+            throws InterruptedException, SeaTunnelException {
         checkReadException();
 
         if (hasNextElement.get()) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index af576d7be..5257064dc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -95,7 +95,8 @@ public class IncrementalSourceStreamFetcher implements 
Fetcher<SourceRecords, So
     }
 
     @Override
-    public Iterator<SourceRecords> pollSplitRecords() throws 
InterruptedException {
+    public Iterator<SourceRecords> pollSplitRecords()
+            throws InterruptedException, SeaTunnelException {
         checkReadException();
         final List<SourceRecord> sourceRecords = new ArrayList<>();
         if (streamFetchTask.isRunning()) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
index e2553a072..d448af95a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
@@ -38,8 +38,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Map;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
-
 @Slf4j
 public class SqlServerSnapshotFetchTask implements FetchTask<SourceSplitBase> {
 
@@ -148,7 +146,7 @@ public class SqlServerSnapshotFetchTask implements 
FetchTask<SourceSplitBase> {
         // task to read binlog and backfill for current split
         return new 
SqlServerTransactionLogFetchTask.TransactionLogSplitReadTask(
                 new SqlServerConnectorConfig(dezConf),
-                
createSqlServerConnection(context.getSourceConfig().getDbzConfiguration()),
+                context.getDataConnection(),
                 context.getMetadataConnection(),
                 context.getDispatcher(),
                 context.getErrorHandler(),
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
index 39dd9d004..6377a24dd 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
@@ -31,7 +31,7 @@ public class SourceReaderOptions {
     public static final Option<Long> SOURCE_READER_CLOSE_TIMEOUT =
             Options.key("source.reader.close.timeout")
                     .longType()
-                    .defaultValue(30000L)
+                    .defaultValue(60000L)
                     .withDescription("The timeout when closing the source 
reader");
 
     public static final Option<Integer> ELEMENT_QUEUE_CAPACITY =
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
index 8130faa44..ef074d574 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
@@ -68,9 +68,9 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
                             fetcherIndex);
                 }
             }
-        } catch (InterruptedException e) {
+        } catch (IOException | InterruptedException e) {
             // this should only happen on shutdown
-            throw new IOException("Source fetch execution was interrupted", e);
+            throw new IOException("Source fetch execution was fail", e);
         } finally {
             // clean up the potential wakeup effect.
             if (isWakeup()) {

Reply via email to