This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a2d2f2dda [Bugfix][zeta] Fix cdc connection does not close (#4922)
a2d2f2dda is described below
commit a2d2f2dda8895aa028ab63719aab913b09ce7c03
Author: ic4y <[email protected]>
AuthorDate: Fri Jul 7 14:17:40 2023 +0800
[Bugfix][zeta] Fix cdc connection does not close (#4922)
---
.../cdc/base/source/reader/IncrementalSourceSplitReader.java | 3 ++-
.../seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java | 3 ++-
.../cdc/base/source/reader/external/IncrementalSourceScanFetcher.java | 3 ++-
.../base/source/reader/external/IncrementalSourceStreamFetcher.java | 3 ++-
.../source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java | 4 +---
.../seatunnel/common/source/reader/SourceReaderOptions.java | 2 +-
.../connectors/seatunnel/common/source/reader/fetcher/FetchTask.java | 4 ++--
7 files changed, 12 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
index c509db376..932b5f0e4 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceSplitReader.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.cdc.base.source.reader;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import
org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
@@ -66,7 +67,7 @@ public class IncrementalSourceSplitReader<C extends
SourceConfig>
Iterator<SourceRecords> dataIt = null;
try {
dataIt = currentFetcher.pollSplitRecords();
- } catch (InterruptedException e) {
+ } catch (InterruptedException | SeaTunnelException e) {
log.warn("fetch data failed.", e);
throw new IOException(e);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
index 3bf20b6bd..a818daa44 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/Fetcher.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.cdc.base.source.reader.external;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
@@ -35,7 +36,7 @@ public interface Fetcher<T, Split> {
* Fetched records from data source. The method should return null when
reaching the end of the
* split, the empty {@link Iterator} will be returned if the data of split
is on pulling.
*/
- Iterator<T> pollSplitRecords() throws InterruptedException;
+ Iterator<T> pollSplitRecords() throws InterruptedException,
SeaTunnelException;
/** Return the current fetch task is finished or not. */
boolean isFinished();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index fc97d7d21..7a09ac6bc 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -107,7 +107,8 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
}
@Override
- public Iterator<SourceRecords> pollSplitRecords() throws
InterruptedException {
+ public Iterator<SourceRecords> pollSplitRecords()
+ throws InterruptedException, SeaTunnelException {
checkReadException();
if (hasNextElement.get()) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index af576d7be..5257064dc 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -95,7 +95,8 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
}
@Override
- public Iterator<SourceRecords> pollSplitRecords() throws
InterruptedException {
+ public Iterator<SourceRecords> pollSplitRecords()
+ throws InterruptedException, SeaTunnelException {
checkReadException();
final List<SourceRecord> sourceRecords = new ArrayList<>();
if (streamFetchTask.isRunning()) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
index e2553a072..d448af95a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
@@ -38,8 +38,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
-
@Slf4j
public class SqlServerSnapshotFetchTask implements FetchTask<SourceSplitBase> {
@@ -148,7 +146,7 @@ public class SqlServerSnapshotFetchTask implements
FetchTask<SourceSplitBase> {
// task to read binlog and backfill for current split
return new
SqlServerTransactionLogFetchTask.TransactionLogSplitReadTask(
new SqlServerConnectorConfig(dezConf),
-
createSqlServerConnection(context.getSourceConfig().getDbzConfiguration()),
+ context.getDataConnection(),
context.getMetadataConnection(),
context.getDispatcher(),
context.getErrorHandler(),
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
index 39dd9d004..6377a24dd 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
@@ -31,7 +31,7 @@ public class SourceReaderOptions {
public static final Option<Long> SOURCE_READER_CLOSE_TIMEOUT =
Options.key("source.reader.close.timeout")
.longType()
- .defaultValue(30000L)
+ .defaultValue(60000L)
.withDescription("The timeout when closing the source
reader");
public static final Option<Integer> ELEMENT_QUEUE_CAPACITY =
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
index 8130faa44..ef074d574 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
@@ -68,9 +68,9 @@ class FetchTask<E, SplitT extends SourceSplit> implements
SplitFetcherTask {
fetcherIndex);
}
}
- } catch (InterruptedException e) {
+ } catch (IOException | InterruptedException e) {
// this should only happen on shutdown
- throw new IOException("Source fetch execution was interrupted", e);
+ throw new IOException("Source fetch execution was fail", e);
} finally {
// clean up the potential wakeup effect.
if (isWakeup()) {