This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c13c03199 [Hotfix][CDC] Fix chunk start/end parameter type error
(#4777)
c13c03199 is described below
commit c13c031995cba3d8a7b391c263f9155691ebdc87
Author: hailin0 <[email protected]>
AuthorDate: Sun May 21 22:16:26 2023 +0800
[Hotfix][CDC] Fix chunk start/end parameter type error (#4777)
Incorrect wrapping as Array<Array> types, but only Array type required
---
.../cdc/base/source/split/CompletedSnapshotSplitInfo.java | 8 ++++----
.../seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java | 8 ++++----
.../seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java | 4 +++-
.../source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java | 4 ++--
.../sqlserver/source/source/eumerator/SqlServerChunkSplitter.java | 4 +++-
.../source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java | 4 ++--
6 files changed, 18 insertions(+), 14 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
index 60e511ec2..39b78d03c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/CompletedSnapshotSplitInfo.java
@@ -30,16 +30,16 @@ public class CompletedSnapshotSplitInfo implements
Serializable {
private final String splitId;
private final TableId tableId;
private final SeaTunnelRowType splitKeyType;
- private final Object splitStart;
- private final Object splitEnd;
+ private final Object[] splitStart;
+ private final Object[] splitEnd;
private final Offset watermark;
public CompletedSnapshotSplitInfo(
String splitId,
TableId tableId,
SeaTunnelRowType splitKeyType,
- Object splitStart,
- Object splitEnd,
+ Object[] splitStart,
+ Object[] splitEnd,
Offset watermark) {
this.splitId = splitId;
this.tableId = tableId;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
index 733f8832f..776ad6891 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/split/SnapshotSplit.java
@@ -28,8 +28,8 @@ public class SnapshotSplit extends SourceSplitBase {
private static final long serialVersionUID = 1L;
private final TableId tableId;
private final SeaTunnelRowType splitKeyType;
- private final Object splitStart;
- private final Object splitEnd;
+ private final Object[] splitStart;
+ private final Object[] splitEnd;
private final Offset highWatermark;
@@ -37,8 +37,8 @@ public class SnapshotSplit extends SourceSplitBase {
String splitId,
TableId tableId,
SeaTunnelRowType splitKeyType,
- Object splitStart,
- Object splitEnd,
+ Object[] splitStart,
+ Object[] splitEnd,
Offset highWatermark) {
super(splitId);
this.tableId = tableId;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index a0f2f9c26..c248edd69 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -295,8 +295,10 @@ public class MySqlChunkSplitter implements
JdbcSourceChunkSplitter {
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
+ Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
+ Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
return new SnapshotSplit(
- splitId(tableId, chunkId), tableId, splitKeyType, chunkStart,
chunkEnd, null);
+ splitId(tableId, chunkId), tableId, splitKeyType, splitStart,
splitEnd, null);
}
//
------------------------------------------------------------------------------------------
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
index 2aafa3789..82104aeb1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java
@@ -205,8 +205,8 @@ public class MySqlSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSourc
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null,
- new Object[] {snapshotSplit.getSplitStart()},
- new Object[] {snapshotSplit.getSplitEnd()},
+ snapshotSplit.getSplitStart(),
+ snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getTotalFields(),
connectorConfig.getSnapshotFetchSize());
ResultSet rs = selectStatement.executeQuery()) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index dc751ca2e..d25269ee5 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -292,8 +292,10 @@ public class SqlServerChunkSplitter implements
JdbcSourceChunkSplitter {
Object chunkStart,
Object chunkEnd) {
// currently, we only support single split column
+ Object[] splitStart = chunkStart == null ? null : new Object[]
{chunkStart};
+ Object[] splitEnd = chunkEnd == null ? null : new Object[] {chunkEnd};
return new SnapshotSplit(
- splitId(tableId, chunkId), tableId, splitKeyType, chunkStart,
chunkEnd, null);
+ splitId(tableId, chunkId), tableId, splitKeyType, splitStart,
splitEnd, null);
}
//
------------------------------------------------------------------------------------------
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
index 8995ef4f5..d25934ff6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/scan/SqlServerSnapshotSplitReadTask.java
@@ -195,8 +195,8 @@ public class SqlServerSnapshotSplitReadTask extends
AbstractSnapshotChangeEventS
selectSql,
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null,
- new Object[] {snapshotSplit.getSplitStart()},
- new Object[] {snapshotSplit.getSplitEnd()},
+ snapshotSplit.getSplitStart(),
+ snapshotSplit.getSplitEnd(),
snapshotSplit.getSplitKeyType().getTotalFields(),
connectorConfig.getSnapshotFetchSize());
ResultSet rs = selectStatement.executeQuery()) {