This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ac61409bd [bugfix][cdc-base] Fix cdc base shutdown thread not cleared
(#4327)
ac61409bd is described below
commit ac61409bd845cd90ff04961ed25ace2820f92aea
Author: ic4y <[email protected]>
AuthorDate: Wed Mar 15 10:59:24 2023 +0800
[bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327)
---------
Co-authored-by: TaoZex <[email protected]>
---
.../cdc/base/source/reader/external/FetchTask.java | 5 ++++
.../external/IncrementalSourceScanFetcher.java | 11 +++++++--
.../external/IncrementalSourceStreamFetcher.java | 11 +++++++--
.../seatunnel/cdc/mysql/source/MySqlDialect.java | 11 +--------
.../reader/fetch/MySqlSourceFetchTaskContext.java | 27 +++++++++++++++++-----
.../reader/fetch/binlog/MySqlBinlogFetchTask.java | 5 ++++
.../reader/fetch/scan/MySqlSnapshotFetchTask.java | 5 ++++
.../seatunnel/cdc/mysql/utils/MySqlUtils.java | 1 +
.../sqlserver/source/source/SqlServerDialect.java | 8 +------
.../fetch/SqlServerSourceFetchTaskContext.java | 25 +++++++++++++++-----
.../fetch/scan/SqlServerSnapshotFetchTask.java | 5 ++++
.../SqlServerTransactionLogFetchTask.java | 5 ++++
12 files changed, 86 insertions(+), 33 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
index 7a8ba5fe3..1cc61bd77 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/FetchTask.java
@@ -41,6 +41,9 @@ public interface FetchTask<Split> {
/** Returns current task is running or not. */
boolean isRunning();
+ /** Close this task */
+ void shutdown();
+
/** Returns the split that the task used. */
Split getSplit();
@@ -63,5 +66,7 @@ public interface FetchTask<Split> {
void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer,
SourceRecord changeRecord);
List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord>
snapshotRecords);
+
+ void close();
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
index 25bf35429..56f59ba08 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java
@@ -192,13 +192,20 @@ public class IncrementalSourceScanFetcher implements
Fetcher<SourceRecords, Sour
@Override
public void close() {
try {
+ if (taskContext != null) {
+ taskContext.close();
+ }
+ if (snapshotSplitReadTask != null) {
+ snapshotSplitReadTask.shutdown();
+ }
if (executorService != null) {
executorService.shutdown();
- if (executorService.awaitTermination(
+ if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
- "Failed to close the scan fetcher in {} seconds.",
+ "Failed to close the scan fetcher in {} seconds.
Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
+ executorService.shutdownNow();
}
}
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
index f14eac745..f3715e710 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java
@@ -121,13 +121,20 @@ public class IncrementalSourceStreamFetcher implements
Fetcher<SourceRecords, So
@Override
public void close() {
try {
+ if (taskContext != null) {
+ taskContext.close();
+ }
+ if (streamFetchTask != null) {
+ streamFetchTask.shutdown();
+ }
if (executorService != null) {
executorService.shutdown();
- if (executorService.awaitTermination(
+ if (!executorService.awaitTermination(
READER_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn(
- "Failed to close the stream fetcher in {}
seconds.",
+ "Failed to close the stream fetcher in {} seconds.
Service will execute force close(ExecutorService.shutdownNow)",
READER_CLOSE_TIMEOUT_SECONDS);
+ executorService.shutdownNow();
}
}
} catch (Exception e) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
index 1352a78e0..15d3b6bf7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlDialect.java
@@ -33,8 +33,6 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.s
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
@@ -42,8 +40,6 @@ import io.debezium.relational.history.TableChanges;
import java.sql.SQLException;
import java.util.List;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
-import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.isTableIdCaseSensitive;
/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
@@ -104,12 +100,7 @@ public class MySqlDialect implements JdbcDataSourceDialect
{
@Override
public MySqlSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig
taskSourceConfig) {
- final MySqlConnection jdbcConnection =
- createMySqlConnection(taskSourceConfig.getDbzConfiguration());
- final BinaryLogClient binaryLogClient =
- createBinaryClient(taskSourceConfig.getDbzConfiguration());
- return new MySqlSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, binaryLogClient);
+ return new MySqlSourceFetchTaskContext(taskSourceConfig, this);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
index b32f62f6b..c6aebc8aa 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java
@@ -62,15 +62,21 @@ import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
+import lombok.extern.slf4j.Slf4j;
+import java.io.IOException;
+import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createMySqlConnection;
/** The context for fetch task that fetching data of snapshot split from MySQL
data source. */
+@Slf4j
public class MySqlSourceFetchTaskContext extends JdbcSourceFetchTaskContext {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlSourceFetchTaskContext.class);
@@ -89,13 +95,10 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
private MySqlErrorHandler errorHandler;
public MySqlSourceFetchTaskContext(
- JdbcSourceConfig sourceConfig,
- JdbcDataSourceDialect dataSourceDialect,
- MySqlConnection connection,
- BinaryLogClient binaryLogClient) {
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect
dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
- this.connection = connection;
- this.binaryLogClient = binaryLogClient;
+ this.connection =
createMySqlConnection(sourceConfig.getDbzConfiguration());
+ this.binaryLogClient =
createBinaryClient(sourceConfig.getDbzConfiguration());
this.metadataProvider = new MySqlEventMetadataProvider();
}
@@ -159,6 +162,18 @@ public class MySqlSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
this.errorHandler = new
MySqlErrorHandler(connectorConfig.getLogicalName(), queue);
}
+ @Override
+ public void close() {
+ try {
+ this.connection.close();
+ this.binaryLogClient.disconnect();
+ } catch (SQLException e) {
+ log.warn("Failed to close connection", e);
+ } catch (IOException e) {
+ log.warn("Failed to close binaryLogClient", e);
+ }
+ }
+
@Override
public MySqlSourceConfig getSourceConfig() {
return (MySqlSourceConfig) sourceConfig;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
index d59070a2f..f858864e7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -81,6 +81,11 @@ public class MySqlBinlogFetchTask implements
FetchTask<SourceSplitBase> {
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
index 6c0bea13e..ac997f7ed 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -155,6 +155,11 @@ public class MySqlSnapshotFetchTask implements
FetchTask<SourceSplitBase> {
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 1f2fb0973..b9ffe6034 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -339,6 +339,7 @@ public class MySqlUtils {
private static PreparedStatement initStatement(JdbcConnection jdbc, String
sql, int fetchSize)
throws SQLException {
final Connection connection = jdbc.connection();
+ // Add MySQL metadata locks to prevent modification of table structure.
connection.setAutoCommit(false);
final PreparedStatement statement =
connection.prepareStatement(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
index cdef63119..0494cd98e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerDialect.java
@@ -34,7 +34,6 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.source.rea
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.TableDiscoveryUtils;
-import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
@@ -104,13 +103,8 @@ public class SqlServerDialect implements
JdbcDataSourceDialect {
@Override
public SqlServerSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig
taskSourceConfig) {
- final SqlServerConnection jdbcConnection =
-
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
- final SqlServerConnection metaDataConnection =
-
createSqlServerConnection(taskSourceConfig.getDbzConfiguration());
- return new SqlServerSourceFetchTaskContext(
- taskSourceConfig, this, jdbcConnection, metaDataConnection);
+ return new SqlServerSourceFetchTaskContext(taskSourceConfig, this);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
index 6dd8e7d40..200806f39 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java
@@ -58,13 +58,18 @@ import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Collect;
+import lombok.extern.slf4j.Slf4j;
+import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
+
/** The context for fetch task that fetching data of snapshot split from MySQL
data source. */
+@Slf4j
public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext {
private final SqlServerConnection dataConnection;
@@ -83,13 +88,11 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
public SqlServerSourceFetchTaskContext(
- JdbcSourceConfig sourceConfig,
- JdbcDataSourceDialect dataSourceDialect,
- SqlServerConnection dataConnection,
- SqlServerConnection metadataConnection) {
+ JdbcSourceConfig sourceConfig, JdbcDataSourceDialect
dataSourceDialect) {
super(sourceConfig, dataSourceDialect);
- this.dataConnection = dataConnection;
- this.metadataConnection = metadataConnection;
+
+ this.dataConnection =
createSqlServerConnection(sourceConfig.getDbzConfiguration());
+ this.metadataConnection =
createSqlServerConnection(sourceConfig.getDbzConfiguration());
this.metadataProvider = new SqlServerEventMetadataProvider();
}
@@ -158,6 +161,16 @@ public class SqlServerSourceFetchTaskContext extends
JdbcSourceFetchTaskContext
this.errorHandler = new
SqlServerErrorHandler(connectorConfig.getLogicalName(), queue);
}
+ @Override
+ public void close() {
+ try {
+ this.dataConnection.close();
+ this.metadataConnection.close();
+ } catch (SQLException e) {
+ log.warn("Failed to close connection", e);
+ }
+ }
+
@Override
public SqlServerSourceConfig getSourceConfig() {
return (SqlServerSourceConfig) sourceConfig;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
index a8246fa24..82d43971e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotFetchTask.java
@@ -166,6 +166,11 @@ public class SqlServerSnapshotFetchTask implements
FetchTask<SourceSplitBase> {
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
index 21a6af3a5..f6968b621 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/transactionlog/SqlServerTransactionLogFetchTask.java
@@ -78,6 +78,11 @@ public class SqlServerTransactionLogFetchTask implements
FetchTask<SourceSplitBa
return taskRunning;
}
+ @Override
+ public void shutdown() {
+ taskRunning = false;
+ }
+
@Override
public SourceSplitBase getSplit() {
return split;