This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 868ba4d7c7 [Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read 
(#6684)
868ba4d7c7 is described below

commit 868ba4d7c76eab8619798b4f90afc4c188c3997d
Author: hailin0 <[email protected]>
AuthorDate: Thu May 9 22:16:36 2024 +0800

    [Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read (#6684)
---
 .../splitter/AbstractJdbcSourceChunkSplitter.java  | 22 +++----
 .../splitter/JdbcSourceChunkSplitter.java          | 69 ++++++++++++++-----
 .../jdbc/source/JdbcSourceChunkSplitterTest.java   |  2 +-
 .../AbstractJdbcSourceChunkSplitterTest.java       |  3 +-
 .../mysql/source/eumerator/MySqlChunkSplitter.java |  8 +--
 .../source/eumerator/OracleChunkSplitter.java      |  8 +--
 .../source/enumerator/PostgresChunkSplitter.java   | 49 +++++++++++---
 .../snapshot/PostgresSnapshotSplitReadTask.java    |  2 +-
 .../cdc/postgres/utils/PostgresUtils.java          | 77 +++++++++++++++-------
 .../cdc/postgres/utils/PostgresUtilsTest.java      | 25 +++++--
 .../source/eumerator/SqlServerChunkSplitter.java   |  9 ++-
 .../seatunnel/jdbc/internal/JdbcInputFormat.java   |  2 +-
 .../jdbc/internal/dialect/JdbcDialect.java         | 15 +++++
 .../internal/dialect/psql/PostgresDialect.java     | 75 ++++++++++++++++++++-
 .../seatunnel/jdbc/source/ChunkSplitter.java       | 21 ++++--
 .../jdbc/source/DynamicChunkSplitter.java          | 44 ++++++++-----
 .../seatunnel/jdbc/source/FixedChunkSplitter.java  | 18 ++++-
 .../jdbc/source/DynamicChunkSplitterTest.java      | 43 +++++++++++-
 18 files changed, 376 insertions(+), 116 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
index f124e5fc71..e10c70795b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java
@@ -116,7 +116,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
     private List<ChunkRange> splitTableIntoChunks(
             JdbcConnection jdbc, TableId tableId, Column splitColumn) throws 
SQLException {
         final String splitColumnName = splitColumn.name();
-        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
+        final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
         final Object min = minMax[0];
         final Object max = minMax[1];
         if (min == null || max == null || min.equals(max)) {
@@ -177,8 +177,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
                             tableId,
                             inverseSamplingRate);
                     Object[] sample =
-                            sampleDataFromColumn(
-                                    jdbc, tableId, splitColumnName, 
inverseSamplingRate);
+                            sampleDataFromColumn(jdbc, tableId, splitColumn, 
inverseSamplingRate);
                     log.info(
                             "Sample data from table {} end, the sample size is 
{}",
                             tableId,
@@ -186,11 +185,10 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
                     return efficientShardingThroughSampling(
                             tableId, sample, approximateRowCnt, shardCount);
                 }
-                return splitUnevenlySizedChunks(
-                        jdbc, tableId, splitColumnName, min, max, chunkSize);
+                return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, 
min, max, chunkSize);
             }
         } else {
-            return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, 
min, max, chunkSize);
+            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, 
max, chunkSize);
         }
     }
 
@@ -198,7 +196,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
     protected List<ChunkRange> splitUnevenlySizedChunks(
             JdbcConnection jdbc,
             TableId tableId,
-            String splitColumnName,
+            Column splitColumn,
             Object min,
             Object max,
             int chunkSize)
@@ -207,7 +205,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
                 "Use unevenly-sized chunks for table {}, the chunk size is 
{}", tableId, chunkSize);
         final List<ChunkRange> splits = new ArrayList<>();
         Object chunkStart = null;
-        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, 
max, chunkSize);
+        Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, 
chunkSize);
         int count = 0;
         while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) {
             // we start from [null, min + chunk_size) and avoid [null, min)
@@ -215,7 +213,7 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
             // may sleep a while to avoid DDOS on MySQL server
             maySleep(count++, tableId);
             chunkStart = chunkEnd;
-            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, 
max, chunkSize);
+            chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, 
chunkSize);
         }
         // add the ending split
         splits.add(ChunkRange.of(chunkStart, null));
@@ -226,17 +224,17 @@ public abstract class AbstractJdbcSourceChunkSplitter 
implements JdbcSourceChunk
             JdbcConnection jdbc,
             Object previousChunkEnd,
             TableId tableId,
-            String splitColumnName,
+            Column splitColumn,
             Object max,
             int chunkSize)
             throws SQLException {
         // chunk end might be null when max values are removed
         Object chunkEnd =
-                queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, 
previousChunkEnd);
+                queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, 
previousChunkEnd);
         if (Objects.equals(previousChunkEnd, chunkEnd)) {
             // we don't allow equal chunk start and end,
             // should query the next one larger than chunkEnd
-            chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
+            chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
         }
         if (ObjectCompare(chunkEnd, max) >= 0) {
             return null;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
index b271be0d76..3981ddfa7c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 
 import java.sql.SQLException;
@@ -35,16 +36,29 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
     @Override
     Collection<SnapshotSplit> generateSplits(TableId tableId);
 
+    /** @deprecated instead by {@link this#queryMinMax(JdbcConnection, 
TableId, Column)} */
+    @Deprecated
+    Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String 
columnName)
+            throws SQLException;
+
     /**
      * Query the maximum and minimum value of the column in the table. e.g. 
query string <code>
      * SELECT MIN(%s) FROM %s WHERE %s > ?</code>
      *
      * @param jdbc JDBC connection.
      * @param tableId table identity.
-     * @param columnName column name.
+     * @param column column.
      * @return maximum and minimum value.
      */
-    Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String 
columnName)
+    default Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
column)
+            throws SQLException {
+        return queryMinMax(jdbc, tableId, column.name());
+    }
+
+    /** @deprecated instead by {@link this#queryMin(JdbcConnection, TableId, 
Column, Object)} */
+    @Deprecated
+    Object queryMin(
+            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
             throws SQLException;
 
     /**
@@ -54,12 +68,19 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
      *
      * @param jdbc JDBC connection.
      * @param tableId table identity.
-     * @param columnName column name.
+     * @param column column.
      * @param excludedLowerBound the minimum value should be greater than this 
value.
      * @return minimum value.
      */
-    Object queryMin(
-            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
+    default Object queryMin(
+            JdbcConnection jdbc, TableId tableId, Column column, Object 
excludedLowerBound)
+            throws SQLException {
+        return queryMin(jdbc, tableId, column.name(), excludedLowerBound);
+    }
+
+    @Deprecated
+    Object[] sampleDataFromColumn(
+            JdbcConnection jdbc, TableId tableId, String columnName, int 
samplingRate)
             throws SQLException;
 
     /**
@@ -68,14 +89,29 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
      *
      * @param jdbc The JDBC connection object used to connect to the database.
      * @param tableId The ID of the table in which the column resides.
-     * @param columnName The name of the column to be sampled.
+     * @param column The column to be sampled.
      * @param samplingRate samplingRate The inverse of the fraction of the 
data to be sampled from
      *     the column. For example, a value of 1000 would mean 1/1000 of the 
data will be sampled.
      * @return Returns a List of sampled data from the specified column.
      * @throws SQLException If an SQL error occurs during the sampling 
operation.
      */
-    Object[] sampleDataFromColumn(
-            JdbcConnection jdbc, TableId tableId, String columnName, int 
samplingRate)
+    default Object[] sampleDataFromColumn(
+            JdbcConnection jdbc, TableId tableId, Column column, int 
samplingRate)
+            throws SQLException {
+        return sampleDataFromColumn(jdbc, tableId, column.name(), 
samplingRate);
+    }
+
+    /**
+     * @deprecated instead by {@link this#queryNextChunkMax(JdbcConnection, 
TableId, Column, int,
+     *     Object)}
+     */
+    @Deprecated
+    Object queryNextChunkMax(
+            JdbcConnection jdbc,
+            TableId tableId,
+            String columnName,
+            int chunkSize,
+            Object includedLowerBound)
             throws SQLException;
 
     /**
@@ -85,18 +121,20 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
      *
      * @param jdbc JDBC connection.
      * @param tableId table identity.
-     * @param columnName column name.
+     * @param column column.
      * @param chunkSize chunk size.
      * @param includedLowerBound the previous chunk end value.
      * @return next chunk end value.
      */
-    Object queryNextChunkMax(
+    default Object queryNextChunkMax(
             JdbcConnection jdbc,
             TableId tableId,
-            String columnName,
+            Column column,
             int chunkSize,
             Object includedLowerBound)
-            throws SQLException;
+            throws SQLException {
+        return queryNextChunkMax(jdbc, tableId, column.name(), chunkSize, 
includedLowerBound);
+    }
 
     /**
      * Approximate total number of entries in the lookup table.
@@ -110,17 +148,14 @@ public interface JdbcSourceChunkSplitter extends 
ChunkSplitter {
     /**
      * Build the scan query sql of the {@link SnapshotSplit}.
      *
-     * @param tableId table identity.
+     * @param table table.
      * @param splitKeyType primary key type.
      * @param isFirstSplit whether the first split.
      * @param isLastSplit whether the last split.
      * @return query sql.
      */
     String buildSplitScanQuery(
-            TableId tableId,
-            SeaTunnelRowType splitKeyType,
-            boolean isFirstSplit,
-            boolean isLastSplit);
+            Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit);
 
     /**
      * Checks whether split column is evenly distributed across its range.
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
index 86500f248f..32617fe18c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java
@@ -106,7 +106,7 @@ public class JdbcSourceChunkSplitterTest {
 
         @Override
         public String buildSplitScanQuery(
-                TableId tableId,
+                Table table,
                 SeaTunnelRowType splitKeyType,
                 boolean isFirstSplit,
                 boolean isLastSplit) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
index 076bafae15..6f646eb6be 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 
 import java.sql.SQLException;
@@ -217,7 +218,7 @@ public class AbstractJdbcSourceChunkSplitterTest {
 
         @Override
         public String buildSplitScanQuery(
-                TableId tableId,
+                Table table,
                 SeaTunnelRowType splitKeyType,
                 boolean isFirstSplit,
                 boolean isLastSplit) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
index c078f7cf28..b4982f2cbe 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlUtils;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
@@ -79,11 +80,8 @@ public class MySqlChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
 
     @Override
     public String buildSplitScanQuery(
-            TableId tableId,
-            SeaTunnelRowType splitKeyType,
-            boolean isFirstSplit,
-            boolean isLastSplit) {
-        return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
+            Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
+        return MySqlUtils.buildSplitScanQuery(table.id(), splitKeyType, 
isFirstSplit, isLastSplit);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
index 8500c0c055..52df70cbc8 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils.OracleUtils;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 import oracle.sql.ROWID;
@@ -84,11 +85,8 @@ public class OracleChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
 
     @Override
     public String buildSplitScanQuery(
-            TableId tableId,
-            SeaTunnelRowType splitKeyType,
-            boolean isFirstSplit,
-            boolean isLastSplit) {
-        return OracleUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
+            Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
+        return OracleUtils.buildSplitScanQuery(table.id(), splitKeyType, 
isFirstSplit, isLastSplit);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
index db1109a453..2aab573d2e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils.PostgresUtil
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
@@ -43,14 +44,27 @@ public class PostgresChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
     @Override
     public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String 
columnName)
             throws SQLException {
-        return PostgresUtils.queryMinMax(jdbc, tableId, columnName);
+        return PostgresUtils.queryMinMax(jdbc, tableId, columnName, null);
+    }
+
+    @Override
+    public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column 
column)
+            throws SQLException {
+        return PostgresUtils.queryMinMax(jdbc, tableId, column.name(), column);
     }
 
     @Override
     public Object queryMin(
             JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
             throws SQLException {
-        return PostgresUtils.queryMin(jdbc, tableId, columnName, 
excludedLowerBound);
+        return PostgresUtils.queryMin(jdbc, tableId, columnName, null, 
excludedLowerBound);
+    }
+
+    @Override
+    public Object queryMin(
+            JdbcConnection jdbc, TableId tableId, Column column, Object 
excludedLowerBound)
+            throws SQLException {
+        return PostgresUtils.queryMin(jdbc, tableId, column.name(), column, 
excludedLowerBound);
     }
 
     @Override
@@ -58,7 +72,15 @@ public class PostgresChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
             JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
             throws SQLException {
         return PostgresUtils.skipReadAndSortSampleData(
-                jdbc, tableId, columnName, inverseSamplingRate);
+                jdbc, tableId, columnName, null, inverseSamplingRate);
+    }
+
+    @Override
+    public Object[] sampleDataFromColumn(
+            JdbcConnection jdbc, TableId tableId, Column column, int 
inverseSamplingRate)
+            throws SQLException {
+        return PostgresUtils.skipReadAndSortSampleData(
+                jdbc, tableId, column.name(), column, inverseSamplingRate);
     }
 
     @Override
@@ -70,7 +92,19 @@ public class PostgresChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
             Object includedLowerBound)
             throws SQLException {
         return PostgresUtils.queryNextChunkMax(
-                jdbc, tableId, columnName, chunkSize, includedLowerBound);
+                jdbc, tableId, columnName, null, chunkSize, 
includedLowerBound);
+    }
+
+    @Override
+    public Object queryNextChunkMax(
+            JdbcConnection jdbc,
+            TableId tableId,
+            Column column,
+            int chunkSize,
+            Object includedLowerBound)
+            throws SQLException {
+        return PostgresUtils.queryNextChunkMax(
+                jdbc, tableId, column.name(), column, chunkSize, 
includedLowerBound);
     }
 
     @Override
@@ -80,11 +114,8 @@ public class PostgresChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
 
     @Override
     public String buildSplitScanQuery(
-            TableId tableId,
-            SeaTunnelRowType splitKeyType,
-            boolean isFirstSplit,
-            boolean isLastSplit) {
-        return PostgresUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
+            Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
+        return PostgresUtils.buildSplitScanQuery(table, splitKeyType, 
isFirstSplit, isLastSplit);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
index abf9c6fafe..dc2c52ccca 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
@@ -179,7 +179,7 @@ public class PostgresSnapshotSplitReadTask extends 
AbstractSnapshotChangeEventSo
 
         final String selectSql =
                 PostgresUtils.buildSplitScanQuery(
-                        snapshotSplit.getTableId(),
+                        table,
                         snapshotSplit.getSplitKeyType(),
                         snapshotSplit.getSplitStart() == null,
                         snapshotSplit.getSplitEnd() == null);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
index 576c7fb536..b5cd090453 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
 
 import org.apache.kafka.connect.source.SourceRecord;
 
@@ -51,15 +53,20 @@ import java.util.Optional;
 @Slf4j
 public class PostgresUtils {
     private static final int DEFAULT_FETCH_SIZE = 1024;
+    private static final JdbcDialect JDBC_DIALECT = new PostgresDialect();
 
     private PostgresUtils() {}
 
-    public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, 
String columnName)
+    public static Object[] queryMinMax(
+            JdbcConnection jdbc, TableId tableId, String columnName, Column 
column)
             throws SQLException {
+        columnName = quote(columnName);
+        if (column != null) {
+            columnName = JDBC_DIALECT.convertType(columnName, 
column.typeName());
+        }
         final String minMaxQuery =
                 String.format(
-                        "SELECT MIN(%s), MAX(%s) FROM %s",
-                        quote(columnName), quote(columnName), quote(tableId));
+                        "SELECT MIN(%s), MAX(%s) FROM %s", columnName, 
columnName, quote(tableId));
         return jdbc.queryAndMap(
                 minMaxQuery,
                 rs -> {
@@ -96,12 +103,20 @@ public class PostgresUtils {
     }
 
     public static Object queryMin(
-            JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
+            JdbcConnection jdbc,
+            TableId tableId,
+            String columnName,
+            Column column,
+            Object excludedLowerBound)
             throws SQLException {
+        columnName = quote(columnName);
+        if (column != null) {
+            columnName = JDBC_DIALECT.convertType(columnName, 
column.typeName());
+        }
         final String minQuery =
                 String.format(
                         "SELECT MIN(%s) FROM %s WHERE %s > ?",
-                        quote(columnName), quote(tableId), quote(columnName));
+                        columnName, quote(tableId), columnName);
         return jdbc.prepareQueryAndMap(
                 minQuery,
                 ps -> ps.setObject(1, excludedLowerBound),
@@ -141,10 +156,17 @@ public class PostgresUtils {
     }
 
     public static Object[] skipReadAndSortSampleData(
-            JdbcConnection jdbc, TableId tableId, String columnName, int 
inverseSamplingRate)
+            JdbcConnection jdbc,
+            TableId tableId,
+            String columnName,
+            Column column,
+            int inverseSamplingRate)
             throws SQLException {
-        final String sampleQuery =
-                String.format("SELECT %s FROM %s", quote(columnName), 
quote(tableId));
+        columnName = quote(columnName);
+        if (column != null) {
+            columnName = JDBC_DIALECT.convertType(columnName, 
column.typeName());
+        }
+        final String sampleQuery = String.format("SELECT %s FROM %s", 
columnName, quote(tableId));
 
         Statement stmt = null;
         ResultSet rs = null;
@@ -198,10 +220,14 @@ public class PostgresUtils {
             JdbcConnection jdbc,
             TableId tableId,
             String splitColumnName,
+            Column splitColumn,
             int chunkSize,
             Object includedLowerBound)
             throws SQLException {
         String quotedColumn = quote(splitColumnName);
+        if (splitColumn != null) {
+            quotedColumn = JDBC_DIALECT.convertType(quotedColumn, 
splitColumn.typeName());
+        }
         String query =
                 String.format(
                         "SELECT MAX(%s) FROM ("
@@ -273,8 +299,8 @@ public class PostgresUtils {
 
     /** Get split scan query for the given table. */
     public static String buildSplitScanQuery(
-            TableId tableId, SeaTunnelRowType rowType, boolean isFirstSplit, 
boolean isLastSplit) {
-        return buildSplitQuery(tableId, rowType, isFirstSplit, isLastSplit, 
-1, true);
+            Table table, SeaTunnelRowType rowType, boolean isFirstSplit, 
boolean isLastSplit) {
+        return buildSplitQuery(table, rowType, isFirstSplit, isLastSplit, -1, 
true);
     }
 
     /** Get table split data PreparedStatement. */
@@ -328,7 +354,7 @@ public class PostgresUtils {
     }
 
     private static String buildSplitQuery(
-            TableId tableId,
+            Table table,
             SeaTunnelRowType rowType,
             boolean isFirstSplit,
             boolean isLastSplit,
@@ -340,37 +366,37 @@ public class PostgresUtils {
             condition = null;
         } else if (isFirstSplit) {
             final StringBuilder sql = new StringBuilder();
-            addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+            addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?");
             if (isScanningData) {
                 sql.append(" AND NOT (");
-                addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+                addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?");
                 sql.append(")");
             }
             condition = sql.toString();
         } else if (isLastSplit) {
             final StringBuilder sql = new StringBuilder();
-            addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+            addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?");
             condition = sql.toString();
         } else {
             final StringBuilder sql = new StringBuilder();
-            addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?");
+            addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?");
             if (isScanningData) {
                 sql.append(" AND NOT (");
-                addPrimaryKeyColumnsToCondition(rowType, sql, " = ?");
+                addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?");
                 sql.append(")");
             }
             sql.append(" AND ");
-            addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?");
+            addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?");
             condition = sql.toString();
         }
 
         if (isScanningData) {
             return buildSelectWithRowLimits(
-                    tableId, limitSize, "*", Optional.ofNullable(condition), 
Optional.empty());
+                    table.id(), limitSize, "*", 
Optional.ofNullable(condition), Optional.empty());
         } else {
             final String orderBy = String.join(", ", rowType.getFieldNames());
             return buildSelectWithBoundaryRowLimits(
-                    tableId,
+                    table.id(),
                     limitSize,
                     getPrimaryKeyColumnsProjection(rowType),
                     getMaxPrimaryKeyColumnsProjection(rowType),
@@ -441,11 +467,14 @@ public class PostgresUtils {
     }
 
     private static void addPrimaryKeyColumnsToCondition(
-            SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
-        for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
-                fieldNamesIt.hasNext(); ) {
-            sql.append(quote(fieldNamesIt.next())).append(predicate);
-            if (fieldNamesIt.hasNext()) {
+            Table table, SeaTunnelRowType rowType, StringBuilder sql, String 
predicate) {
+        for (int i = 0; i < rowType.getTotalFields(); i++) {
+            String fieldName = quote(rowType.getFieldName(i));
+            fieldName =
+                    JDBC_DIALECT.convertType(
+                            fieldName, 
table.columnWithName(rowType.getFieldName(i)).typeName());
+            sql.append(fieldName).append(predicate);
+            if (i < rowType.getTotalFields() - 1) {
                 sql.append(" AND ");
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
index e8e5bb22d2..6ce08f953c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
@@ -24,14 +24,21 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 
 public class PostgresUtilsTest {
     @Test
     public void testSplitScanQuery() {
+        Table table =
+                Table.editor()
+                        .tableId(TableId.parse("db1.schema1.table1"))
+                        
.addColumn(Column.editor().name("id").type("int8").create())
+                        .create();
         String splitScanSQL =
                 PostgresUtils.buildSplitScanQuery(
-                        TableId.parse("db1.schema1.table1"),
+                        table,
                         new SeaTunnelRowType(
                                 new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
                         false,
@@ -42,7 +49,7 @@ public class PostgresUtilsTest {
 
         splitScanSQL =
                 PostgresUtils.buildSplitScanQuery(
-                        TableId.parse("db1.schema1.table1"),
+                        table,
                         new SeaTunnelRowType(
                                 new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
                         true,
@@ -51,7 +58,7 @@ public class PostgresUtilsTest {
 
         splitScanSQL =
                 PostgresUtils.buildSplitScanQuery(
-                        TableId.parse("db1.schema1.table1"),
+                        table,
                         new SeaTunnelRowType(
                                 new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
                         true,
@@ -60,14 +67,20 @@ public class PostgresUtilsTest {
                 "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND 
NOT (\"id\" = ?)",
                 splitScanSQL);
 
+        table =
+                Table.editor()
+                        .tableId(TableId.parse("db1.schema1.table1"))
+                        
.addColumn(Column.editor().name("id").type("uuid").create())
+                        .create();
         splitScanSQL =
                 PostgresUtils.buildSplitScanQuery(
-                        TableId.parse("db1.schema1.table1"),
+                        table,
                         new SeaTunnelRowType(
-                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                                new String[] {"id"},
+                                new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}),
                         false,
                         true);
         Assertions.assertEquals(
-                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", 
splitScanSQL);
+                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\"::text >= 
?", splitScanSQL);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
index 1dc97020be..b6698f5319 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlS
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.Column;
+import io.debezium.relational.Table;
 import io.debezium.relational.TableId;
 import lombok.extern.slf4j.Slf4j;
 
@@ -80,11 +81,9 @@ public class SqlServerChunkSplitter extends 
AbstractJdbcSourceChunkSplitter {
 
     @Override
     public String buildSplitScanQuery(
-            TableId tableId,
-            SeaTunnelRowType splitKeyType,
-            boolean isFirstSplit,
-            boolean isLastSplit) {
-        return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType, 
isFirstSplit, isLastSplit);
+            Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, 
boolean isLastSplit) {
+        return SqlServerUtils.buildSplitScanQuery(
+                table.id(), splitKeyType, isFirstSplit, isLastSplit);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index c2fec61341..8588ef16df 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -95,7 +95,7 @@ public class JdbcInputFormat implements Serializable {
             splitTableSchema = 
tables.get(inputSplit.getTablePath()).getTableSchema();
             splitTableId = inputSplit.getTablePath().toString();
 
-            statement = chunkSplitter.generateSplitStatement(inputSplit);
+            statement = chunkSplitter.generateSplitStatement(inputSplit, 
splitTableSchema);
             resultSet = statement.executeQuery();
             hasNext = resultSet.next();
         } catch (SQLException se) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index ea284e3fd9..db9b90dade 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -75,6 +75,10 @@ public interface JdbcDialect extends Serializable {
      */
     JdbcDialectTypeMapper getJdbcDialectTypeMapper();
 
+    default String hashModForField(String nativeType, String fieldName, int 
mod) {
+        return hashModForField(fieldName, mod);
+    }
+
     default String hashModForField(String fieldName, int mod) {
         return "ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
     }
@@ -405,4 +409,15 @@ public interface JdbcDialect extends Serializable {
             JdbcConnectionConfig jdbcConnectionConfig) {
         return new SimpleJdbcConnectionProvider(jdbcConnectionConfig);
     }
+
+    /**
+     * Cast column type e.g. CAST(column AS type)
+     *
+     * @param columnName
+     * @param columnType
+     * @return the text of converted column type.
+     */
+    default String convertType(String columnName, String columnType) {
+        return columnName;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 82d02119d1..a2cd997e00 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
@@ -36,6 +37,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -68,9 +70,73 @@ public class PostgresDialect implements JdbcDialect {
         return new PostgresTypeMapper();
     }
 
+    @Override
+    public String hashModForField(String nativeType, String fieldName, int 
mod) {
+        String quoteFieldName = quoteIdentifier(fieldName);
+        if (StringUtils.isNotBlank(nativeType)) {
+            quoteFieldName = convertType(quoteFieldName, nativeType);
+        }
+        return "(ABS(HASHTEXT(" + quoteFieldName + ")) % " + mod + ")";
+    }
+
     @Override
     public String hashModForField(String fieldName, int mod) {
-        return "(ABS(HASHTEXT(" + quoteIdentifier(fieldName) + ")) % " + mod + 
")";
+        return hashModForField(null, fieldName, mod);
+    }
+
+    @Override
+    public Object queryNextChunkMax(
+            Connection connection,
+            JdbcSourceTable table,
+            String columnName,
+            int chunkSize,
+            Object includedLowerBound)
+            throws SQLException {
+        Map<String, Column> columns =
+                table.getCatalogTable().getTableSchema().getColumns().stream()
+                        .collect(Collectors.toMap(c -> c.getName(), c -> c));
+        Column column = columns.get(columnName);
+
+        String quotedColumn = quoteIdentifier(columnName);
+        quotedColumn = convertType(quotedColumn, column.getSourceType());
+        String sqlQuery;
+        if (StringUtils.isNotBlank(table.getQuery())) {
+            sqlQuery =
+                    String.format(
+                            "SELECT MAX(%s) FROM ("
+                                    + "SELECT %s FROM (%s) AS T1 WHERE %s >= ? 
ORDER BY %s ASC LIMIT %s"
+                                    + ") AS T2",
+                            quotedColumn,
+                            quotedColumn,
+                            table.getQuery(),
+                            quotedColumn,
+                            quotedColumn,
+                            chunkSize);
+        } else {
+            sqlQuery =
+                    String.format(
+                            "SELECT MAX(%s) FROM ("
+                                    + "SELECT %s FROM %s WHERE %s >= ? ORDER 
BY %s ASC LIMIT %s"
+                                    + ") AS T",
+                            quotedColumn,
+                            quotedColumn,
+                            tableIdentifier(table.getTablePath()),
+                            quotedColumn,
+                            quotedColumn,
+                            chunkSize);
+        }
+        try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
+            ps.setObject(1, includedLowerBound);
+            try (ResultSet rs = ps.executeQuery()) {
+                if (rs.next()) {
+                    return rs.getObject(1);
+                } else {
+                    // this should never happen
+                    throw new SQLException(
+                            String.format("No result returned after running 
query [%s]", sqlQuery));
+                }
+            }
+        }
     }
 
     @Override
@@ -189,4 +255,11 @@ public class PostgresDialect implements JdbcDialect {
         }
         return SQLUtils.countForSubquery(connection, table.getQuery());
     }
+
+    public String convertType(String columnName, String columnType) {
+        if (PostgresTypeConverter.PG_UUID.equals(columnType)) {
+            return columnName + "::text";
+        }
+        return columnName;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index 89ec64b817..198dfe47cb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -113,15 +113,16 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
     protected abstract Collection<JdbcSourceSplit> createSplits(
             JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws 
SQLException;
 
-    public PreparedStatement generateSplitStatement(JdbcSourceSplit split) 
throws SQLException {
+    public PreparedStatement generateSplitStatement(JdbcSourceSplit split, 
TableSchema schema)
+            throws SQLException {
         if (split.getSplitKeyName() == null) {
             return createSingleSplitStatement(split);
         }
-        return createSplitStatement(split);
+        return createSplitStatement(split, schema);
     }
 
-    protected abstract PreparedStatement createSplitStatement(JdbcSourceSplit 
split)
-            throws SQLException;
+    protected abstract PreparedStatement createSplitStatement(
+            JdbcSourceSplit split, TableSchema schema) throws SQLException;
 
     protected PreparedStatement createPreparedStatement(String sql) throws 
SQLException {
         Connection connection = getOrEstablishConnection();
@@ -174,7 +175,13 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
     protected Object queryMin(JdbcSourceTable table, String columnName, Object 
excludedLowerBound)
             throws SQLException {
         String minQuery;
+        Map<String, Column> columns =
+                table.getCatalogTable().getTableSchema().getColumns().stream()
+                        .collect(Collectors.toMap(c -> c.getName(), c -> c));
+        Column column = columns.get(columnName);
+
         columnName = jdbcDialect.quoteIdentifier(columnName);
+        columnName = jdbcDialect.convertType(columnName, 
column.getSourceType());
         if (StringUtils.isNotBlank(table.getQuery())) {
             minQuery =
                     String.format(
@@ -206,7 +213,13 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
     protected Pair<Object, Object> queryMinMax(JdbcSourceTable table, String 
columnName)
             throws SQLException {
         String sqlQuery;
+        Map<String, Column> columns =
+                table.getCatalogTable().getTableSchema().getColumns().stream()
+                        .collect(Collectors.toMap(c -> c.getName(), c -> c));
+        Column column = columns.get(columnName);
+
         columnName = jdbcDialect.quoteIdentifier(columnName);
+        columnName = jdbcDialect.convertType(columnName, 
column.getSourceType());
         if (StringUtils.isNotBlank(table.getQuery())) {
             sqlQuery =
                     String.format(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index 2993f749c6..9dc26d1ef2 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
 import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
@@ -41,12 +43,12 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static java.math.BigDecimal.ROUND_CEILING;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -65,8 +67,9 @@ public class DynamicChunkSplitter extends ChunkSplitter {
     }
 
     @Override
-    protected PreparedStatement createSplitStatement(JdbcSourceSplit split) 
throws SQLException {
-        return createDynamicSplitStatement(split);
+    protected PreparedStatement createSplitStatement(JdbcSourceSplit split, 
TableSchema schema)
+            throws SQLException {
+        return createDynamicSplitStatement(split, schema);
     }
 
     private Collection<JdbcSourceSplit> createDynamicSplits(
@@ -92,9 +95,9 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         return splits;
     }
 
-    private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit 
split)
+    private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit 
split, TableSchema schema)
             throws SQLException {
-        String splitQuery = createDynamicSplitQuerySQL(split);
+        String splitQuery = createDynamicSplitQuerySQL(split, schema);
         PreparedStatement statement = createPreparedStatement(splitQuery);
         prepareDynamicSplitStatement(statement, split);
         return statement;
@@ -452,7 +455,7 @@ public class DynamicChunkSplitter extends ChunkSplitter {
     }
 
     @VisibleForTesting
-    String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
+    String createDynamicSplitQuerySQL(JdbcSourceSplit split, TableSchema 
schema) {
         SeaTunnelRowType rowType =
                 new SeaTunnelRowType(
                         new String[] {split.getSplitKeyName()},
@@ -465,23 +468,23 @@ public class DynamicChunkSplitter extends ChunkSplitter {
             condition = null;
         } else if (isFirstSplit) {
             StringBuilder sql = new StringBuilder();
-            addKeyColumnsToCondition(rowType, sql, " <= ?");
+            addKeyColumnsToCondition(schema, rowType, sql, " <= ?");
             sql.append(" AND NOT (");
-            addKeyColumnsToCondition(rowType, sql, " = ?");
+            addKeyColumnsToCondition(schema, rowType, sql, " = ?");
             sql.append(")");
             condition = sql.toString();
         } else if (isLastSplit) {
             StringBuilder sql = new StringBuilder();
-            addKeyColumnsToCondition(rowType, sql, " >= ?");
+            addKeyColumnsToCondition(schema, rowType, sql, " >= ?");
             condition = sql.toString();
         } else {
             StringBuilder sql = new StringBuilder();
-            addKeyColumnsToCondition(rowType, sql, " >= ?");
+            addKeyColumnsToCondition(schema, rowType, sql, " >= ?");
             sql.append(" AND NOT (");
-            addKeyColumnsToCondition(rowType, sql, " = ?");
+            addKeyColumnsToCondition(schema, rowType, sql, " = ?");
             sql.append(")");
             sql.append(" AND ");
-            addKeyColumnsToCondition(rowType, sql, " <= ?");
+            addKeyColumnsToCondition(schema, rowType, sql, " <= ?");
             condition = sql.toString();
         }
 
@@ -503,11 +506,16 @@ public class DynamicChunkSplitter extends ChunkSplitter {
     }
 
     private void addKeyColumnsToCondition(
-            SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
-        for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
-                fieldNamesIt.hasNext(); ) {
-            
sql.append(jdbcDialect.quoteIdentifier(fieldNamesIt.next())).append(predicate);
-            if (fieldNamesIt.hasNext()) {
+            TableSchema schema, SeaTunnelRowType rowType, StringBuilder sql, 
String predicate) {
+        Map<String, Column> columns =
+                schema.getColumns().stream().collect(Collectors.toMap(c -> 
c.getName(), c -> c));
+        for (int i = 0; i < rowType.getTotalFields(); i++) {
+            String fieldName = 
jdbcDialect.quoteIdentifier(rowType.getFieldName(i));
+            fieldName =
+                    jdbcDialect.convertType(
+                            fieldName, 
columns.get(rowType.getFieldName(i)).getSourceType());
+            sql.append(fieldName).append(predicate);
+            if (i < rowType.getTotalFields() - 1) {
                 sql.append(" AND ");
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
index aa93d2d8a5..edeef96f0a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -89,7 +91,8 @@ public class FixedChunkSplitter extends ChunkSplitter {
     }
 
     @Override
-    protected PreparedStatement createSplitStatement(JdbcSourceSplit split) 
throws SQLException {
+    protected PreparedStatement createSplitStatement(JdbcSourceSplit split, 
TableSchema schema)
+            throws SQLException {
         if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType())) {
             return createStringColumnSplitStatement(split);
         }
@@ -103,6 +106,11 @@ public class FixedChunkSplitter extends ChunkSplitter {
     private Collection<JdbcSourceSplit> createStringColumnSplits(
             JdbcSourceTable table, String splitKeyName, SeaTunnelDataType 
splitKeyType) {
         List<JdbcSourceSplit> splits = new 
ArrayList<>(table.getPartitionNumber());
+        Column column =
+                table.getCatalogTable().getTableSchema().getColumns().stream()
+                        .filter(c -> c.getName().equals(splitKeyName))
+                        .findAny()
+                        .get();
         for (int i = 0; i < table.getPartitionNumber(); i++) {
             String splitQuery;
             if (StringUtils.isNotBlank(table.getQuery())) {
@@ -111,14 +119,18 @@ public class FixedChunkSplitter extends ChunkSplitter {
                                 "SELECT * FROM (%s) st_jdbc_splitter WHERE %s 
= ?",
                                 table.getQuery(),
                                 jdbcDialect.hashModForField(
-                                        splitKeyName, 
table.getPartitionNumber()));
+                                        column.getSourceType(),
+                                        splitKeyName,
+                                        table.getPartitionNumber()));
             } else {
                 splitQuery =
                         String.format(
                                 "SELECT * FROM %s WHERE %s = ?",
                                 
jdbcDialect.tableIdentifier(table.getTablePath()),
                                 jdbcDialect.hashModForField(
-                                        splitKeyName, 
table.getPartitionNumber()));
+                                        column.getSourceType(),
+                                        splitKeyName,
+                                        table.getPartitionNumber()));
             }
 
             JdbcSourceSplit split =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
index c71ae7b43d..70963a9f72 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
@@ -35,7 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class DynamicChunkSplitterTest {
 
     @Test
-    public void testGenerateSplitQuerySQL() {
+    public void testPostgresGenerateSplitQuerySQL() {
         JdbcSourceConfig config =
                 JdbcSourceConfig.builder()
                         .jdbcConnectionConfig(
@@ -44,6 +46,17 @@ public class DynamicChunkSplitterTest {
                                         .driverName("org.postgresql.Driver")
                                         .build())
                         .build();
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .columns(
+                                Arrays.asList(
+                                        PhysicalColumn.builder()
+                                                .name("id")
+                                                .sourceType("int4")
+                                                .dataType(BasicType.INT_TYPE)
+                                                .build()))
+                        .build();
+
         DynamicChunkSplitter splitter = new DynamicChunkSplitter(config);
 
         JdbcSourceSplit split =
@@ -55,7 +68,7 @@ public class DynamicChunkSplitterTest {
                         BasicType.INT_TYPE,
                         1,
                         10);
-        String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+        String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split, 
tableSchema);
         Assertions.assertEquals(
                 "SELECT * FROM \"db1\".\"schema1\".\"table1\" WHERE \"id\" >= 
? AND NOT (\"id\" = ?) AND \"id\" <= ?",
                 splitQuerySQL);
@@ -69,10 +82,34 @@ public class DynamicChunkSplitterTest {
                         BasicType.INT_TYPE,
                         1,
                         10);
-        splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+        splitQuerySQL = splitter.createDynamicSplitQuerySQL(split, 
tableSchema);
         Assertions.assertEquals(
                 "SELECT * FROM (select * from table1) tmp WHERE \"id\" >= ? 
AND NOT (\"id\" = ?) AND \"id\" <= ?",
                 splitQuerySQL);
+
+        tableSchema =
+                TableSchema.builder()
+                        .columns(
+                                Arrays.asList(
+                                        PhysicalColumn.builder()
+                                                .name("id")
+                                                .sourceType("uuid")
+                                                .dataType(BasicType.INT_TYPE)
+                                                .build()))
+                        .build();
+        split =
+                new JdbcSourceSplit(
+                        TablePath.of("db1", "schema1", "table1"),
+                        "split1",
+                        "select * from table1",
+                        "id",
+                        BasicType.INT_TYPE,
+                        1,
+                        10);
+        splitQuerySQL = splitter.createDynamicSplitQuerySQL(split, 
tableSchema);
+        Assertions.assertEquals(
+                "SELECT * FROM (select * from table1) tmp WHERE \"id\"::text 
>= ? AND NOT (\"id\"::text = ?) AND \"id\"::text <= ?",
+                splitQuerySQL);
     }
 
     @Test

Reply via email to