This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 868ba4d7c7 [Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read
(#6684)
868ba4d7c7 is described below
commit 868ba4d7c76eab8619798b4f90afc4c188c3997d
Author: hailin0 <[email protected]>
AuthorDate: Thu May 9 22:16:36 2024 +0800
[Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read (#6684)
---
.../splitter/AbstractJdbcSourceChunkSplitter.java | 22 +++----
.../splitter/JdbcSourceChunkSplitter.java | 69 ++++++++++++++-----
.../jdbc/source/JdbcSourceChunkSplitterTest.java | 2 +-
.../AbstractJdbcSourceChunkSplitterTest.java | 3 +-
.../mysql/source/eumerator/MySqlChunkSplitter.java | 8 +--
.../source/eumerator/OracleChunkSplitter.java | 8 +--
.../source/enumerator/PostgresChunkSplitter.java | 49 +++++++++++---
.../snapshot/PostgresSnapshotSplitReadTask.java | 2 +-
.../cdc/postgres/utils/PostgresUtils.java | 77 +++++++++++++++-------
.../cdc/postgres/utils/PostgresUtilsTest.java | 25 +++++--
.../source/eumerator/SqlServerChunkSplitter.java | 9 ++-
.../seatunnel/jdbc/internal/JdbcInputFormat.java | 2 +-
.../jdbc/internal/dialect/JdbcDialect.java | 15 +++++
.../internal/dialect/psql/PostgresDialect.java | 75 ++++++++++++++++++++-
.../seatunnel/jdbc/source/ChunkSplitter.java | 21 ++++--
.../jdbc/source/DynamicChunkSplitter.java | 44 ++++++++-----
.../seatunnel/jdbc/source/FixedChunkSplitter.java | 18 ++++-
.../jdbc/source/DynamicChunkSplitterTest.java | 43 +++++++++++-
18 files changed, 376 insertions(+), 116 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index f124e5fc71..e10c70795b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -116,7 +116,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws
SQLException {
final String splitColumnName = splitColumn.name();
- final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
+ final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
@@ -177,8 +177,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
tableId,
inverseSamplingRate);
Object[] sample =
- sampleDataFromColumn(
- jdbc, tableId, splitColumnName,
inverseSamplingRate);
+ sampleDataFromColumn(jdbc, tableId, splitColumn,
inverseSamplingRate);
log.info(
"Sample data from table {} end, the sample size is
{}",
tableId,
@@ -186,11 +185,10 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
return efficientShardingThroughSampling(
tableId, sample, approximateRowCnt, shardCount);
}
- return splitUnevenlySizedChunks(
- jdbc, tableId, splitColumnName, min, max, chunkSize);
+ return splitUnevenlySizedChunks(jdbc, tableId, splitColumn,
min, max, chunkSize);
}
} else {
- return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName,
min, max, chunkSize);
+ return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min,
max, chunkSize);
}
}
@@ -198,7 +196,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
protected List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
- String splitColumnName,
+ Column splitColumn,
Object min,
Object max,
int chunkSize)
@@ -207,7 +205,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
"Use unevenly-sized chunks for table {}, the chunk size is
{}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
- Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName,
max, chunkSize);
+ Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max,
chunkSize);
int count = 0;
while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
@@ -215,7 +213,7 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
- chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName,
max, chunkSize);
+ chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max,
chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
@@ -226,17 +224,17 @@ public abstract class AbstractJdbcSourceChunkSplitter
implements JdbcSourceChunk
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
- String splitColumnName,
+ Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
- queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize,
previousChunkEnd);
+ queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize,
previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
- chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
+ chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectCompare(chunkEnd, max) >= 0) {
return null;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index b271be0d76..3981ddfa7c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.sql.SQLException;
@@ -35,16 +36,29 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
@Override
Collection<SnapshotSplit> generateSplits(TableId tableId);
+ /** @deprecated instead by {@link this#queryMinMax(JdbcConnection,
TableId, Column)} */
+ @Deprecated
+ Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
columnName)
+ throws SQLException;
+
/**
* Query the maximum and minimum value of the column in the table. e.g.
query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param tableId table identity.
- * @param columnName column name.
+ * @param column column.
* @return maximum and minimum value.
*/
- Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
columnName)
+ default Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
column)
+ throws SQLException {
+ return queryMinMax(jdbc, tableId, column.name());
+ }
+
+ /** @deprecated instead by {@link this#queryMin(JdbcConnection, TableId,
Column, Object)} */
+ @Deprecated
+ Object queryMin(
+ JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
throws SQLException;
/**
@@ -54,12 +68,19 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
*
* @param jdbc JDBC connection.
* @param tableId table identity.
- * @param columnName column name.
+ * @param column column.
* @param excludedLowerBound the minimum value should be greater than this
value.
* @return minimum value.
*/
- Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
+ default Object queryMin(
+ JdbcConnection jdbc, TableId tableId, Column column, Object
excludedLowerBound)
+ throws SQLException {
+ return queryMin(jdbc, tableId, column.name(), excludedLowerBound);
+ }
+
+ @Deprecated
+ Object[] sampleDataFromColumn(
+ JdbcConnection jdbc, TableId tableId, String columnName, int
samplingRate)
throws SQLException;
/**
@@ -68,14 +89,29 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
*
* @param jdbc The JDBC connection object used to connect to the database.
* @param tableId The ID of the table in which the column resides.
- * @param columnName The name of the column to be sampled.
+ * @param column The column to be sampled.
* @param samplingRate samplingRate The inverse of the fraction of the
data to be sampled from
* the column. For example, a value of 1000 would mean 1/1000 of the
data will be sampled.
* @return Returns a List of sampled data from the specified column.
* @throws SQLException If an SQL error occurs during the sampling
operation.
*/
- Object[] sampleDataFromColumn(
- JdbcConnection jdbc, TableId tableId, String columnName, int
samplingRate)
+ default Object[] sampleDataFromColumn(
+ JdbcConnection jdbc, TableId tableId, Column column, int
samplingRate)
+ throws SQLException {
+ return sampleDataFromColumn(jdbc, tableId, column.name(),
samplingRate);
+ }
+
+ /**
+ * @deprecated instead by {@link this#queryNextChunkMax(JdbcConnection,
TableId, Column, int,
+ * Object)}
+ */
+ @Deprecated
+ Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ int chunkSize,
+ Object includedLowerBound)
throws SQLException;
/**
@@ -85,18 +121,20 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
*
* @param jdbc JDBC connection.
* @param tableId table identity.
- * @param columnName column name.
+ * @param column column.
* @param chunkSize chunk size.
* @param includedLowerBound the previous chunk end value.
* @return next chunk end value.
*/
- Object queryNextChunkMax(
+ default Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
- String columnName,
+ Column column,
int chunkSize,
Object includedLowerBound)
- throws SQLException;
+ throws SQLException {
+ return queryNextChunkMax(jdbc, tableId, column.name(), chunkSize,
includedLowerBound);
+ }
/**
* Approximate total number of entries in the lookup table.
@@ -110,17 +148,14 @@ public interface JdbcSourceChunkSplitter extends
ChunkSplitter {
/**
* Build the scan query sql of the {@link SnapshotSplit}.
*
- * @param tableId table identity.
+ * @param table table.
* @param splitKeyType primary key type.
* @param isFirstSplit whether the first split.
* @param isLastSplit whether the last split.
* @return query sql.
*/
String buildSplitScanQuery(
- TableId tableId,
- SeaTunnelRowType splitKeyType,
- boolean isFirstSplit,
- boolean isLastSplit);
+ Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit);
/**
* Checks whether split column is evenly distributed across its range.
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
index 86500f248f..32617fe18c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
@@ -106,7 +106,7 @@ public class JdbcSourceChunkSplitterTest {
@Override
public String buildSplitScanQuery(
- TableId tableId,
+ Table table,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
index 076bafae15..6f646eb6be 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.sql.SQLException;
@@ -217,7 +218,7 @@ public class AbstractJdbcSourceChunkSplitterTest {
@Override
public String buildSplitScanQuery(
- TableId tableId,
+ Table table,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index c078f7cf28..b4982f2cbe 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
@@ -79,11 +80,8 @@ public class MySqlChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public String buildSplitScanQuery(
- TableId tableId,
- SeaTunnelRowType splitKeyType,
- boolean isFirstSplit,
- boolean isLastSplit) {
- return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
+ Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
+ return MySqlUtils.buildSplitScanQuery(table.id(), splitKeyType,
isFirstSplit, isLastSplit);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
index 8500c0c055..52df70cbc8 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import oracle.sql.ROWID;
@@ -84,11 +85,8 @@ public class OracleChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public String buildSplitScanQuery(
- TableId tableId,
- SeaTunnelRowType splitKeyType,
- boolean isFirstSplit,
- boolean isLastSplit) {
- return OracleUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
+ Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
+ return OracleUtils.buildSplitScanQuery(table.id(), splitKeyType,
isFirstSplit, isLastSplit);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
index db1109a453..2aab573d2e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtil
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
@@ -43,14 +44,27 @@ public class PostgresChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
columnName)
throws SQLException {
- return PostgresUtils.queryMinMax(jdbc, tableId, columnName);
+ return PostgresUtils.queryMinMax(jdbc, tableId, columnName, null);
+ }
+
+ @Override
+ public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column
column)
+ throws SQLException {
+ return PostgresUtils.queryMinMax(jdbc, tableId, column.name(), column);
}
@Override
public Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
throws SQLException {
- return PostgresUtils.queryMin(jdbc, tableId, columnName,
excludedLowerBound);
+ return PostgresUtils.queryMin(jdbc, tableId, columnName, null,
excludedLowerBound);
+ }
+
+ @Override
+ public Object queryMin(
+ JdbcConnection jdbc, TableId tableId, Column column, Object
excludedLowerBound)
+ throws SQLException {
+ return PostgresUtils.queryMin(jdbc, tableId, column.name(), column,
excludedLowerBound);
}
@Override
@@ -58,7 +72,15 @@ public class PostgresChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
throws SQLException {
return PostgresUtils.skipReadAndSortSampleData(
- jdbc, tableId, columnName, inverseSamplingRate);
+ jdbc, tableId, columnName, null, inverseSamplingRate);
+ }
+
+ @Override
+ public Object[] sampleDataFromColumn(
+ JdbcConnection jdbc, TableId tableId, Column column, int
inverseSamplingRate)
+ throws SQLException {
+ return PostgresUtils.skipReadAndSortSampleData(
+ jdbc, tableId, column.name(), column, inverseSamplingRate);
}
@Override
@@ -70,7 +92,19 @@ public class PostgresChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
Object includedLowerBound)
throws SQLException {
return PostgresUtils.queryNextChunkMax(
- jdbc, tableId, columnName, chunkSize, includedLowerBound);
+ jdbc, tableId, columnName, null, chunkSize,
includedLowerBound);
+ }
+
+ @Override
+ public Object queryNextChunkMax(
+ JdbcConnection jdbc,
+ TableId tableId,
+ Column column,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ return PostgresUtils.queryNextChunkMax(
+ jdbc, tableId, column.name(), column, chunkSize,
includedLowerBound);
}
@Override
@@ -80,11 +114,8 @@ public class PostgresChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public String buildSplitScanQuery(
- TableId tableId,
- SeaTunnelRowType splitKeyType,
- boolean isFirstSplit,
- boolean isLastSplit) {
- return PostgresUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
+ Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
+ return PostgresUtils.buildSplitScanQuery(table, splitKeyType,
isFirstSplit, isLastSplit);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
index abf9c6fafe..dc2c52ccca 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
@@ -179,7 +179,7 @@ public class PostgresSnapshotSplitReadTask extends
AbstractSnapshotChangeEventSo
final String selectSql =
PostgresUtils.buildSplitScanQuery(
- snapshotSplit.getTableId(),
+ table,
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
index 576c7fb536..b5cd090453 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
import org.apache.kafka.connect.source.SourceRecord;
@@ -51,15 +53,20 @@ import java.util.Optional;
@Slf4j
public class PostgresUtils {
private static final int DEFAULT_FETCH_SIZE = 1024;
+ private static final JdbcDialect JDBC_DIALECT = new PostgresDialect();
private PostgresUtils() {}
- public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId,
String columnName)
+ public static Object[] queryMinMax(
+ JdbcConnection jdbc, TableId tableId, String columnName, Column
column)
throws SQLException {
+ columnName = quote(columnName);
+ if (column != null) {
+ columnName = JDBC_DIALECT.convertType(columnName,
column.typeName());
+ }
final String minMaxQuery =
String.format(
- "SELECT MIN(%s), MAX(%s) FROM %s",
- quote(columnName), quote(columnName), quote(tableId));
+ "SELECT MIN(%s), MAX(%s) FROM %s", columnName,
columnName, quote(tableId));
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
@@ -96,12 +103,20 @@ public class PostgresUtils {
}
public static Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ Column column,
+ Object excludedLowerBound)
throws SQLException {
+ columnName = quote(columnName);
+ if (column != null) {
+ columnName = JDBC_DIALECT.convertType(columnName,
column.typeName());
+ }
final String minQuery =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
- quote(columnName), quote(tableId), quote(columnName));
+ columnName, quote(tableId), columnName);
return jdbc.prepareQueryAndMap(
minQuery,
ps -> ps.setObject(1, excludedLowerBound),
@@ -141,10 +156,17 @@ public class PostgresUtils {
}
public static Object[] skipReadAndSortSampleData(
- JdbcConnection jdbc, TableId tableId, String columnName, int
inverseSamplingRate)
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ Column column,
+ int inverseSamplingRate)
throws SQLException {
- final String sampleQuery =
- String.format("SELECT %s FROM %s", quote(columnName),
quote(tableId));
+ columnName = quote(columnName);
+ if (column != null) {
+ columnName = JDBC_DIALECT.convertType(columnName,
column.typeName());
+ }
+ final String sampleQuery = String.format("SELECT %s FROM %s",
columnName, quote(tableId));
Statement stmt = null;
ResultSet rs = null;
@@ -198,10 +220,14 @@ public class PostgresUtils {
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
+ Column splitColumn,
int chunkSize,
Object includedLowerBound)
throws SQLException {
String quotedColumn = quote(splitColumnName);
+ if (splitColumn != null) {
+ quotedColumn = JDBC_DIALECT.convertType(quotedColumn,
splitColumn.typeName());
+ }
String query =
String.format(
"SELECT MAX(%s) FROM ("
@@ -273,8 +299,8 @@ public class PostgresUtils {
/** Get split scan query for the given table. */
public static String buildSplitScanQuery(
- TableId tableId, SeaTunnelRowType rowType, boolean isFirstSplit,
boolean isLastSplit) {
- return buildSplitQuery(tableId, rowType, isFirstSplit, isLastSplit,
-1, true);
+ Table table, SeaTunnelRowType rowType, boolean isFirstSplit,
boolean isLastSplit) {
+ return buildSplitQuery(table, rowType, isFirstSplit, isLastSplit, -1,
true);
}
/** Get table split data PreparedStatement. */
@@ -328,7 +354,7 @@ public class PostgresUtils {
}
private static String buildSplitQuery(
- TableId tableId,
+ Table table,
SeaTunnelRowType rowType,
boolean isFirstSplit,
boolean isLastSplit,
@@ -340,37 +366,37 @@ public class PostgresUtils {
condition = null;
} else if (isFirstSplit) {
final StringBuilder sql = new StringBuilder();
- addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+ addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?");
if (isScanningData) {
sql.append(" AND NOT (");
- addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+ addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?");
sql.append(")");
}
condition = sql.toString();
} else if (isLastSplit) {
final StringBuilder sql = new StringBuilder();
- addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+ addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?");
condition = sql.toString();
} else {
final StringBuilder sql = new StringBuilder();
- addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+ addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?");
if (isScanningData) {
sql.append(" AND NOT (");
- addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+ addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?");
sql.append(")");
}
sql.append(" AND ");
- addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+ addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?");
condition = sql.toString();
}
if (isScanningData) {
return buildSelectWithRowLimits(
- tableId, limitSize, "*", Optional.ofNullable(condition),
Optional.empty());
+ table.id(), limitSize, "*",
Optional.ofNullable(condition), Optional.empty());
} else {
final String orderBy = String.join(", ", rowType.getFieldNames());
return buildSelectWithBoundaryRowLimits(
- tableId,
+ table.id(),
limitSize,
getPrimaryKeyColumnsProjection(rowType),
getMaxPrimaryKeyColumnsProjection(rowType),
@@ -441,11 +467,14 @@ public class PostgresUtils {
}
private static void addPrimaryKeyColumnsToCondition(
- SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
- for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
- fieldNamesIt.hasNext(); ) {
- sql.append(quote(fieldNamesIt.next())).append(predicate);
- if (fieldNamesIt.hasNext()) {
+ Table table, SeaTunnelRowType rowType, StringBuilder sql, String
predicate) {
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ String fieldName = quote(rowType.getFieldName(i));
+ fieldName =
+ JDBC_DIALECT.convertType(
+ fieldName,
table.columnWithName(rowType.getFieldName(i)).typeName());
+ sql.append(fieldName).append(predicate);
+ if (i < rowType.getTotalFields() - 1) {
sql.append(" AND ");
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
index e8e5bb22d2..6ce08f953c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
@@ -24,14 +24,21 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
public class PostgresUtilsTest {
@Test
public void testSplitScanQuery() {
+ Table table =
+ Table.editor()
+ .tableId(TableId.parse("db1.schema1.table1"))
+
.addColumn(Column.editor().name("id").type("int8").create())
+ .create();
String splitScanSQL =
PostgresUtils.buildSplitScanQuery(
- TableId.parse("db1.schema1.table1"),
+ table,
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
false,
@@ -42,7 +49,7 @@ public class PostgresUtilsTest {
splitScanSQL =
PostgresUtils.buildSplitScanQuery(
- TableId.parse("db1.schema1.table1"),
+ table,
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
true,
@@ -51,7 +58,7 @@ public class PostgresUtilsTest {
splitScanSQL =
PostgresUtils.buildSplitScanQuery(
- TableId.parse("db1.schema1.table1"),
+ table,
new SeaTunnelRowType(
new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
true,
@@ -60,14 +67,20 @@ public class PostgresUtilsTest {
"SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND
NOT (\"id\" = ?)",
splitScanSQL);
+ table =
+ Table.editor()
+ .tableId(TableId.parse("db1.schema1.table1"))
+
.addColumn(Column.editor().name("id").type("uuid").create())
+ .create();
splitScanSQL =
PostgresUtils.buildSplitScanQuery(
- TableId.parse("db1.schema1.table1"),
+ table,
new SeaTunnelRowType(
- new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ new String[] {"id"},
+ new SeaTunnelDataType[]
{BasicType.STRING_TYPE}),
false,
true);
Assertions.assertEquals(
- "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?",
splitScanSQL);
+ "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\"::text >=
?", splitScanSQL);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index 1dc97020be..b6698f5319 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlS
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
+import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
@@ -80,11 +81,9 @@ public class SqlServerChunkSplitter extends
AbstractJdbcSourceChunkSplitter {
@Override
public String buildSplitScanQuery(
- TableId tableId,
- SeaTunnelRowType splitKeyType,
- boolean isFirstSplit,
- boolean isLastSplit) {
- return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType,
isFirstSplit, isLastSplit);
+ Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit,
boolean isLastSplit) {
+ return SqlServerUtils.buildSplitScanQuery(
+ table.id(), splitKeyType, isFirstSplit, isLastSplit);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index c2fec61341..8588ef16df 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -95,7 +95,7 @@ public class JdbcInputFormat implements Serializable {
splitTableSchema =
tables.get(inputSplit.getTablePath()).getTableSchema();
splitTableId = inputSplit.getTablePath().toString();
- statement = chunkSplitter.generateSplitStatement(inputSplit);
+ statement = chunkSplitter.generateSplitStatement(inputSplit,
splitTableSchema);
resultSet = statement.executeQuery();
hasNext = resultSet.next();
} catch (SQLException se) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index ea284e3fd9..db9b90dade 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -75,6 +75,10 @@ public interface JdbcDialect extends Serializable {
*/
JdbcDialectTypeMapper getJdbcDialectTypeMapper();
+ default String hashModForField(String nativeType, String fieldName, int
mod) {
+ return hashModForField(fieldName, mod);
+ }
+
default String hashModForField(String fieldName, int mod) {
return "ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
}
@@ -405,4 +409,15 @@ public interface JdbcDialect extends Serializable {
JdbcConnectionConfig jdbcConnectionConfig) {
return new SimpleJdbcConnectionProvider(jdbcConnectionConfig);
}
+
+ /**
+ * Cast column type e.g. CAST(column AS type)
+ *
+ * @param columnName
+ * @param columnType
+ * @return the text of converted column type.
+ */
+ default String convertType(String columnName, String columnType) {
+ return columnName;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 82d02119d1..a2cd997e00 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
@@ -36,6 +37,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -68,9 +70,73 @@ public class PostgresDialect implements JdbcDialect {
return new PostgresTypeMapper();
}
+ @Override
+ public String hashModForField(String nativeType, String fieldName, int
mod) {
+ String quoteFieldName = quoteIdentifier(fieldName);
+ if (StringUtils.isNotBlank(nativeType)) {
+ quoteFieldName = convertType(quoteFieldName, nativeType);
+ }
+ return "(ABS(HASHTEXT(" + quoteFieldName + ")) % " + mod + ")";
+ }
+
@Override
public String hashModForField(String fieldName, int mod) {
- return "(ABS(HASHTEXT(" + quoteIdentifier(fieldName) + ")) % " + mod +
")";
+ return hashModForField(null, fieldName, mod);
+ }
+
+ @Override
+ public Object queryNextChunkMax(
+ Connection connection,
+ JdbcSourceTable table,
+ String columnName,
+ int chunkSize,
+ Object includedLowerBound)
+ throws SQLException {
+ Map<String, Column> columns =
+ table.getCatalogTable().getTableSchema().getColumns().stream()
+ .collect(Collectors.toMap(c -> c.getName(), c -> c));
+ Column column = columns.get(columnName);
+
+ String quotedColumn = quoteIdentifier(columnName);
+ quotedColumn = convertType(quotedColumn, column.getSourceType());
+ String sqlQuery;
+ if (StringUtils.isNotBlank(table.getQuery())) {
+ sqlQuery =
+ String.format(
+ "SELECT MAX(%s) FROM ("
+ + "SELECT %s FROM (%s) AS T1 WHERE %s >= ?
ORDER BY %s ASC LIMIT %s"
+ + ") AS T2",
+ quotedColumn,
+ quotedColumn,
+ table.getQuery(),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ } else {
+ sqlQuery =
+ String.format(
+ "SELECT MAX(%s) FROM ("
+ + "SELECT %s FROM %s WHERE %s >= ? ORDER
BY %s ASC LIMIT %s"
+ + ") AS T",
+ quotedColumn,
+ quotedColumn,
+ tableIdentifier(table.getTablePath()),
+ quotedColumn,
+ quotedColumn,
+ chunkSize);
+ }
+ try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
+ ps.setObject(1, includedLowerBound);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ return rs.getObject(1);
+ } else {
+ // this should never happen
+ throw new SQLException(
+ String.format("No result returned after running
query [%s]", sqlQuery));
+ }
+ }
+ }
}
@Override
@@ -189,4 +255,11 @@ public class PostgresDialect implements JdbcDialect {
}
return SQLUtils.countForSubquery(connection, table.getQuery());
}
+
+ public String convertType(String columnName, String columnType) {
+ if (PostgresTypeConverter.PG_UUID.equals(columnType)) {
+ return columnName + "::text";
+ }
+ return columnName;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index 89ec64b817..198dfe47cb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -113,15 +113,16 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
protected abstract Collection<JdbcSourceSplit> createSplits(
JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws
SQLException;
- public PreparedStatement generateSplitStatement(JdbcSourceSplit split)
throws SQLException {
+ public PreparedStatement generateSplitStatement(JdbcSourceSplit split,
TableSchema schema)
+ throws SQLException {
if (split.getSplitKeyName() == null) {
return createSingleSplitStatement(split);
}
- return createSplitStatement(split);
+ return createSplitStatement(split, schema);
}
- protected abstract PreparedStatement createSplitStatement(JdbcSourceSplit
split)
- throws SQLException;
+ protected abstract PreparedStatement createSplitStatement(
+ JdbcSourceSplit split, TableSchema schema) throws SQLException;
protected PreparedStatement createPreparedStatement(String sql) throws
SQLException {
Connection connection = getOrEstablishConnection();
@@ -174,7 +175,13 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
protected Object queryMin(JdbcSourceTable table, String columnName, Object
excludedLowerBound)
throws SQLException {
String minQuery;
+ Map<String, Column> columns =
+ table.getCatalogTable().getTableSchema().getColumns().stream()
+ .collect(Collectors.toMap(c -> c.getName(), c -> c));
+ Column column = columns.get(columnName);
+
columnName = jdbcDialect.quoteIdentifier(columnName);
+ columnName = jdbcDialect.convertType(columnName,
column.getSourceType());
if (StringUtils.isNotBlank(table.getQuery())) {
minQuery =
String.format(
@@ -206,7 +213,13 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
protected Pair<Object, Object> queryMinMax(JdbcSourceTable table, String
columnName)
throws SQLException {
String sqlQuery;
+ Map<String, Column> columns =
+ table.getCatalogTable().getTableSchema().getColumns().stream()
+ .collect(Collectors.toMap(c -> c.getName(), c -> c));
+ Column column = columns.get(columnName);
+
columnName = jdbcDialect.quoteIdentifier(columnName);
+ columnName = jdbcDialect.convertType(columnName,
column.getSourceType());
if (StringUtils.isNotBlank(table.getQuery())) {
sqlQuery =
String.format(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index 2993f749c6..9dc26d1ef2 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
@@ -41,12 +43,12 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import static java.math.BigDecimal.ROUND_CEILING;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -65,8 +67,9 @@ public class DynamicChunkSplitter extends ChunkSplitter {
}
@Override
- protected PreparedStatement createSplitStatement(JdbcSourceSplit split)
throws SQLException {
- return createDynamicSplitStatement(split);
+ protected PreparedStatement createSplitStatement(JdbcSourceSplit split,
TableSchema schema)
+ throws SQLException {
+ return createDynamicSplitStatement(split, schema);
}
private Collection<JdbcSourceSplit> createDynamicSplits(
@@ -92,9 +95,9 @@ public class DynamicChunkSplitter extends ChunkSplitter {
return splits;
}
- private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit
split)
+ private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit
split, TableSchema schema)
throws SQLException {
- String splitQuery = createDynamicSplitQuerySQL(split);
+ String splitQuery = createDynamicSplitQuerySQL(split, schema);
PreparedStatement statement = createPreparedStatement(splitQuery);
prepareDynamicSplitStatement(statement, split);
return statement;
@@ -452,7 +455,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
}
@VisibleForTesting
- String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
+ String createDynamicSplitQuerySQL(JdbcSourceSplit split, TableSchema
schema) {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {split.getSplitKeyName()},
@@ -465,23 +468,23 @@ public class DynamicChunkSplitter extends ChunkSplitter {
condition = null;
} else if (isFirstSplit) {
StringBuilder sql = new StringBuilder();
- addKeyColumnsToCondition(rowType, sql, " <= ?");
+ addKeyColumnsToCondition(schema, rowType, sql, " <= ?");
sql.append(" AND NOT (");
- addKeyColumnsToCondition(rowType, sql, " = ?");
+ addKeyColumnsToCondition(schema, rowType, sql, " = ?");
sql.append(")");
condition = sql.toString();
} else if (isLastSplit) {
StringBuilder sql = new StringBuilder();
- addKeyColumnsToCondition(rowType, sql, " >= ?");
+ addKeyColumnsToCondition(schema, rowType, sql, " >= ?");
condition = sql.toString();
} else {
StringBuilder sql = new StringBuilder();
- addKeyColumnsToCondition(rowType, sql, " >= ?");
+ addKeyColumnsToCondition(schema, rowType, sql, " >= ?");
sql.append(" AND NOT (");
- addKeyColumnsToCondition(rowType, sql, " = ?");
+ addKeyColumnsToCondition(schema, rowType, sql, " = ?");
sql.append(")");
sql.append(" AND ");
- addKeyColumnsToCondition(rowType, sql, " <= ?");
+ addKeyColumnsToCondition(schema, rowType, sql, " <= ?");
condition = sql.toString();
}
@@ -503,11 +506,16 @@ public class DynamicChunkSplitter extends ChunkSplitter {
}
private void addKeyColumnsToCondition(
- SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
- for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
- fieldNamesIt.hasNext(); ) {
-
sql.append(jdbcDialect.quoteIdentifier(fieldNamesIt.next())).append(predicate);
- if (fieldNamesIt.hasNext()) {
+ TableSchema schema, SeaTunnelRowType rowType, StringBuilder sql,
String predicate) {
+ Map<String, Column> columns =
+ schema.getColumns().stream().collect(Collectors.toMap(c ->
c.getName(), c -> c));
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ String fieldName =
jdbcDialect.quoteIdentifier(rowType.getFieldName(i));
+ fieldName =
+ jdbcDialect.convertType(
+ fieldName,
columns.get(rowType.getFieldName(i)).getSourceType());
+ sql.append(fieldName).append(predicate);
+ if (i < rowType.getTotalFields() - 1) {
sql.append(" AND ");
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
index aa93d2d8a5..edeef96f0a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -89,7 +91,8 @@ public class FixedChunkSplitter extends ChunkSplitter {
}
@Override
- protected PreparedStatement createSplitStatement(JdbcSourceSplit split)
throws SQLException {
+ protected PreparedStatement createSplitStatement(JdbcSourceSplit split,
TableSchema schema)
+ throws SQLException {
if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType())) {
return createStringColumnSplitStatement(split);
}
@@ -103,6 +106,11 @@ public class FixedChunkSplitter extends ChunkSplitter {
private Collection<JdbcSourceSplit> createStringColumnSplits(
JdbcSourceTable table, String splitKeyName, SeaTunnelDataType
splitKeyType) {
List<JdbcSourceSplit> splits = new
ArrayList<>(table.getPartitionNumber());
+ Column column =
+ table.getCatalogTable().getTableSchema().getColumns().stream()
+ .filter(c -> c.getName().equals(splitKeyName))
+ .findAny()
+ .get();
for (int i = 0; i < table.getPartitionNumber(); i++) {
String splitQuery;
if (StringUtils.isNotBlank(table.getQuery())) {
@@ -111,14 +119,18 @@ public class FixedChunkSplitter extends ChunkSplitter {
"SELECT * FROM (%s) st_jdbc_splitter WHERE %s
= ?",
table.getQuery(),
jdbcDialect.hashModForField(
- splitKeyName,
table.getPartitionNumber()));
+ column.getSourceType(),
+ splitKeyName,
+ table.getPartitionNumber()));
} else {
splitQuery =
String.format(
"SELECT * FROM %s WHERE %s = ?",
jdbcDialect.tableIdentifier(table.getTablePath()),
jdbcDialect.hashModForField(
- splitKeyName,
table.getPartitionNumber()));
+ column.getSourceType(),
+ splitKeyName,
+ table.getPartitionNumber()));
}
JdbcSourceSplit split =
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
index c71ae7b43d..70963a9f72 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
@@ -35,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class DynamicChunkSplitterTest {
@Test
- public void testGenerateSplitQuerySQL() {
+ public void testPostgresGenerateSplitQuerySQL() {
JdbcSourceConfig config =
JdbcSourceConfig.builder()
.jdbcConnectionConfig(
@@ -44,6 +46,17 @@ public class DynamicChunkSplitterTest {
.driverName("org.postgresql.Driver")
.build())
.build();
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .columns(
+ Arrays.asList(
+ PhysicalColumn.builder()
+ .name("id")
+ .sourceType("int4")
+ .dataType(BasicType.INT_TYPE)
+ .build()))
+ .build();
+
DynamicChunkSplitter splitter = new DynamicChunkSplitter(config);
JdbcSourceSplit split =
@@ -55,7 +68,7 @@ public class DynamicChunkSplitterTest {
BasicType.INT_TYPE,
1,
10);
- String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+ String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split,
tableSchema);
Assertions.assertEquals(
"SELECT * FROM \"db1\".\"schema1\".\"table1\" WHERE \"id\" >=
? AND NOT (\"id\" = ?) AND \"id\" <= ?",
splitQuerySQL);
@@ -69,10 +82,34 @@ public class DynamicChunkSplitterTest {
BasicType.INT_TYPE,
1,
10);
- splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+ splitQuerySQL = splitter.createDynamicSplitQuerySQL(split,
tableSchema);
Assertions.assertEquals(
"SELECT * FROM (select * from table1) tmp WHERE \"id\" >= ?
AND NOT (\"id\" = ?) AND \"id\" <= ?",
splitQuerySQL);
+
+ tableSchema =
+ TableSchema.builder()
+ .columns(
+ Arrays.asList(
+ PhysicalColumn.builder()
+ .name("id")
+ .sourceType("uuid")
+ .dataType(BasicType.INT_TYPE)
+ .build()))
+ .build();
+ split =
+ new JdbcSourceSplit(
+ TablePath.of("db1", "schema1", "table1"),
+ "split1",
+ "select * from table1",
+ "id",
+ BasicType.INT_TYPE,
+ 1,
+ 10);
+ splitQuerySQL = splitter.createDynamicSplitQuerySQL(split,
tableSchema);
+ Assertions.assertEquals(
+ "SELECT * FROM (select * from table1) tmp WHERE \"id\"::text
>= ? AND NOT (\"id\"::text = ?) AND \"id\"::text <= ?",
+ splitQuerySQL);
}
@Test