This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1b406b5a3 [bugfix][connector-v2] fix cdc mysql reader err (#3465)
1b406b5a3 is described below
commit 1b406b5a31ab23cce3c16953c2001c992da21b5e
Author: ic4y <[email protected]>
AuthorDate: Fri Nov 18 15:19:34 2022 +0800
[bugfix][connector-v2] fix cdc mysql reader err (#3465)
---
.../source/reader/fetch/binlog/MySqlBinlogFetchTask.java | 2 +-
.../source/reader/fetch/scan/MySqlSnapshotFetchTask.java | 2 +-
.../reader/fetch/scan/MySqlSnapshotSplitReadTask.java | 15 ++++++++-------
.../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java | 2 +-
4 files changed, 11 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
index 21da9227a..fb1db65a7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/binlog/MySqlBinlogFetchTask.java
@@ -35,7 +35,7 @@ public class MySqlBinlogFetchTask implements
FetchTask<SourceSplitBase> {
}
@Override
- public void execute(Context context) throws Exception {
+ public void execute(FetchTask.Context context) throws Exception {
MySqlSourceFetchTaskContext sourceFetchContext =
(MySqlSourceFetchTaskContext) context;
taskRunning = true;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
index 832bc8b6d..00bda7733 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotFetchTask.java
@@ -36,7 +36,7 @@ public class MySqlSnapshotFetchTask implements
FetchTask<SourceSplitBase> {
}
@Override
- public void execute(Context context) throws Exception {
+ public void execute(FetchTask.Context context) throws Exception {
MySqlSourceFetchTaskContext sourceFetchContext =
(MySqlSourceFetchTaskContext) context;
taskRunning = true;
snapshotSplitReadTask =
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
index 0699ec394..3cc104035 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
@@ -31,6 +31,7 @@ import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlValueConverters;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
+import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
@@ -97,7 +98,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
@Override
public SnapshotResult execute(
- ChangeEventSourceContext context, OffsetContext previousOffset)
+ ChangeEventSource.ChangeEventSourceContext context, OffsetContext
previousOffset)
throws InterruptedException {
SnapshottingTask snapshottingTask =
getSnapshottingTask(previousOffset);
final SnapshotContext ctx;
@@ -119,10 +120,10 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
@Override
protected SnapshotResult doExecute(
- ChangeEventSourceContext context,
+ ChangeEventSource.ChangeEventSourceContext context,
OffsetContext previousOffset,
- SnapshotContext snapshotContext,
- SnapshottingTask snapshottingTask)
+ AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext,
+ AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask)
throws Exception {
final RelationalSnapshotChangeEventSource.RelationalSnapshotContext
ctx =
(RelationalSnapshotChangeEventSource.RelationalSnapshotContext)
snapshotContext;
@@ -152,12 +153,12 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
}
@Override
- protected SnapshottingTask getSnapshottingTask(OffsetContext
previousOffset) {
+ protected AbstractSnapshotChangeEventSource.SnapshottingTask
getSnapshottingTask(OffsetContext previousOffset) {
return new SnapshottingTask(false, true);
}
@Override
- protected SnapshotContext prepare(ChangeEventSourceContext
changeEventSourceContext)
+ protected AbstractSnapshotChangeEventSource.SnapshotContext
prepare(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext)
throws Exception {
return new MySqlSnapshotContext();
}
@@ -249,7 +250,7 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
}
protected ChangeRecordEmitter getChangeRecordEmitter(
- SnapshotContext snapshotContext, TableId tableId, Object[] row) {
+ AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext,
TableId tableId, Object[] row) {
snapshotContext.offset.event(tableId, clock.currentTime());
return new SnapshotChangeRecordEmitter(snapshotContext.offset, row,
clock);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
index 8250e3cf2..10658ed2a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import io.debezium.relational.Column;
import lombok.extern.slf4j.Slf4j;
-/** Utilities for converting from MySQL types to Flink types. */
+/** Utilities for converting from MySQL types to SeaTunnel types. */
@Slf4j
public class MySqlTypeUtils {