This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1b406b5a3 [bugfix][connector-v2] fix cdc mysql reader err (#3465)
1b406b5a3 is described below

commit 1b406b5a31ab23cce3c16953c2001c992da21b5e
Author: ic4y <[email protected]>
AuthorDate: Fri Nov 18 15:19:34 2022 +0800

    [bugfix][connector-v2] fix cdc mysql reader err (#3465)
---
 .../source/reader/fetch/binlog/MySqlBinlogFetchTask.java  |  2 +-
 .../source/reader/fetch/scan/MySqlSnapshotFetchTask.java  |  2 +-
 .../reader/fetch/scan/MySqlSnapshotSplitReadTask.java     | 15 ++++++++-------
 .../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java         |  2 +-
 4 files changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
index 21da9227a..fb1db65a7 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -35,7 +35,7 @@ public class MySqlBinlogFetchTask implements 
FetchTask<SourceSplitBase> {
     }
 
     @Override
-    public void execute(Context context) throws Exception {
+    public void execute(FetchTask.Context context) throws Exception {
         MySqlSourceFetchTaskContext sourceFetchContext = 
(MySqlSourceFetchTaskContext) context;
         taskRunning = true;
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
index 832bc8b6d..00bda7733 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -36,7 +36,7 @@ public class MySqlSnapshotFetchTask implements 
FetchTask<SourceSplitBase> {
     }
 
     @Override
-    public void execute(Context context) throws Exception {
+    public void execute(FetchTask.Context context) throws Exception {
         MySqlSourceFetchTaskContext sourceFetchContext = 
(MySqlSourceFetchTaskContext) context;
         taskRunning = true;
         snapshotSplitReadTask =
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
index 0699ec394..3cc104035 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
@@ -31,6 +31,7 @@ import io.debezium.connector.mysql.MySqlOffsetContext;
 import io.debezium.connector.mysql.MySqlValueConverters;
 import io.debezium.pipeline.EventDispatcher;
 import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
+import io.debezium.pipeline.source.spi.ChangeEventSource;
 import io.debezium.pipeline.source.spi.SnapshotProgressListener;
 import io.debezium.pipeline.spi.ChangeRecordEmitter;
 import io.debezium.pipeline.spi.OffsetContext;
@@ -97,7 +98,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
 
     @Override
     public SnapshotResult execute(
-        ChangeEventSourceContext context, OffsetContext previousOffset)
+        ChangeEventSource.ChangeEventSourceContext context, OffsetContext 
previousOffset)
         throws InterruptedException {
         SnapshottingTask snapshottingTask = 
getSnapshottingTask(previousOffset);
         final SnapshotContext ctx;
@@ -119,10 +120,10 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
 
     @Override
     protected SnapshotResult doExecute(
-        ChangeEventSourceContext context,
+        ChangeEventSource.ChangeEventSourceContext context,
         OffsetContext previousOffset,
-        SnapshotContext snapshotContext,
-        SnapshottingTask snapshottingTask)
+        AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext,
+        AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask)
         throws Exception {
         final RelationalSnapshotChangeEventSource.RelationalSnapshotContext 
ctx =
             (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) 
snapshotContext;
@@ -152,12 +153,12 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
     }
 
     @Override
-    protected SnapshottingTask getSnapshottingTask(OffsetContext 
previousOffset) {
+    protected AbstractSnapshotChangeEventSource.SnapshottingTask 
getSnapshottingTask(OffsetContext previousOffset) {
         return new SnapshottingTask(false, true);
     }
 
     @Override
-    protected SnapshotContext prepare(ChangeEventSourceContext 
changeEventSourceContext)
+    protected AbstractSnapshotChangeEventSource.SnapshotContext 
prepare(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext)
         throws Exception {
         return new MySqlSnapshotContext();
     }
@@ -249,7 +250,7 @@ public class MySqlSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSourc
     }
 
     protected ChangeRecordEmitter getChangeRecordEmitter(
-        SnapshotContext snapshotContext, TableId tableId, Object[] row) {
+        AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, 
TableId tableId, Object[] row) {
         snapshotContext.offset.event(tableId, clock.currentTime());
         return new SnapshotChangeRecordEmitter(snapshotContext.offset, row, 
clock);
     }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
index 8250e3cf2..10658ed2a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import io.debezium.relational.Column;
 import lombok.extern.slf4j.Slf4j;
 
-/** Utilities for converting from MySQL types to Flink types. */
+/** Utilities for converting from MySQL types to SeaTunnel types. */
 
 @Slf4j
 public class MySqlTypeUtils {

Reply via email to