This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 849d748d3d [Improve][Jdbc] Add quote identifier for sql (#6669)
849d748d3d is described below

commit 849d748d3dbf3021d1482988b6d995bcd20bb0a3
Author: hailin0 <[email protected]>
AuthorDate: Wed Apr 10 18:07:56 2024 +0800

    [Improve][Jdbc] Add quote identifier for sql (#6669)
---
 .../seatunnel/cdc/mysql/utils/MySqlUtils.java      |  2 +-
 .../seatunnel/cdc/mysql/utils/MySqlUtilsTest.java  | 72 +++++++++++++++++++++
 .../seatunnel/cdc/oracle/utils/OracleUtils.java    |  2 +-
 .../cdc/oracle/utils/OracleUtilsTest.java          | 73 ++++++++++++++++++++++
 .../cdc/postgres/utils/PostgresUtils.java          |  2 +-
 .../cdc/postgres/utils/PostgresUtilsTest.java      | 73 ++++++++++++++++++++++
 .../cdc/sqlserver/source/utils/SqlServerUtils.java |  2 +-
 .../sqlserver/source/utils/SqlServerUtilsTest.java | 72 +++++++++++++++++++++
 .../internal/dialect/psql/PostgresDialect.java     |  5 ++
 .../jdbc/source/DynamicChunkSplitter.java          |  9 ++-
 .../jdbc/source/DynamicChunkSplitterTest.java      | 45 +++++++++++++
 11 files changed, 350 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 706d7a8862..9b06ddda96 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -434,7 +434,7 @@ public class MySqlUtils {
             SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
         for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append(fieldNamesIt.next()).append(predicate);
+            sql.append(quote(fieldNamesIt.next())).append(predicate);
             if (fieldNamesIt.hasNext()) {
                 sql.append(" AND ");
             }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java
new file mode 100644
index 0000000000..ac03965900
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class MySqlUtilsTest {
+
+    @Test
+    public void testSplitScanQuery() {
+        String splitScanSQL =
+                MySqlUtils.buildSplitScanQuery(
+                        TableId.parse("db1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM `db1`.`table1` WHERE `id` >= ? AND NOT (`id` = 
?) AND `id` <= ?",
+                splitScanSQL);
+
+        splitScanSQL =
+                MySqlUtils.buildSplitScanQuery(
+                        TableId.parse("db1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        true);
+        Assertions.assertEquals("SELECT * FROM `db1`.`table1`", splitScanSQL);
+
+        splitScanSQL =
+                MySqlUtils.buildSplitScanQuery(
+                        TableId.parse("db1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM `db1`.`table1` WHERE `id` <= ? AND NOT (`id` = 
?)", splitScanSQL);
+
+        splitScanSQL =
+                MySqlUtils.buildSplitScanQuery(
+                        TableId.parse("db1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        true);
+        Assertions.assertEquals("SELECT * FROM `db1`.`table1` WHERE `id` >= 
?", splitScanSQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
index 89e843c393..8d67c0f141 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
@@ -439,7 +439,7 @@ public class OracleUtils {
             SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
         for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append(fieldNamesIt.next()).append(predicate);
+            sql.append(quote(fieldNamesIt.next())).append(predicate);
             if (fieldNamesIt.hasNext()) {
                 sql.append(" AND ");
             }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java
new file mode 100644
index 0000000000..10c253da83
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class OracleUtilsTest {
+    @Test
+    public void testSplitScanQuery() {
+        String splitScanSQL =
+                OracleUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND 
NOT (\"id\" = ?) AND \"id\" <= ?",
+                splitScanSQL);
+
+        splitScanSQL =
+                OracleUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        true);
+        Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"", 
splitScanSQL);
+
+        splitScanSQL =
+                OracleUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND 
NOT (\"id\" = ?)",
+                splitScanSQL);
+
+        splitScanSQL =
+                OracleUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        true);
+        Assertions.assertEquals(
+                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", 
splitScanSQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
index b83e287540..576c7fb536 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
@@ -444,7 +444,7 @@ public class PostgresUtils {
             SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
         for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append(fieldNamesIt.next()).append(predicate);
+            sql.append(quote(fieldNamesIt.next())).append(predicate);
             if (fieldNamesIt.hasNext()) {
                 sql.append(" AND ");
             }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
new file mode 100644
index 0000000000..e8e5bb22d2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class PostgresUtilsTest {
+    @Test
+    public void testSplitScanQuery() {
+        String splitScanSQL =
+                PostgresUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND 
NOT (\"id\" = ?) AND \"id\" <= ?",
+                splitScanSQL);
+
+        splitScanSQL =
+                PostgresUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        true);
+        Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"", 
splitScanSQL);
+
+        splitScanSQL =
+                PostgresUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND 
NOT (\"id\" = ?)",
+                splitScanSQL);
+
+        splitScanSQL =
+                PostgresUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        true);
+        Assertions.assertEquals(
+                "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", 
splitScanSQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index 2c83e02e0e..db1872fa64 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -467,7 +467,7 @@ public class SqlServerUtils {
             SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
         for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append(fieldNamesIt.next()).append(predicate);
+            sql.append(quote(fieldNamesIt.next())).append(predicate);
             if (fieldNamesIt.hasNext()) {
                 sql.append(" AND ");
             }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java
new file mode 100644
index 0000000000..470a05e54a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class SqlServerUtilsTest {
+    @Test
+    public void testSplitScanQuery() {
+        String splitScanSQL =
+                SqlServerUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM [schema1].[table1] WHERE [id] >= ? AND NOT 
([id] = ?) AND [id] <= ?",
+                splitScanSQL);
+
+        splitScanSQL =
+                SqlServerUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        true);
+        Assertions.assertEquals("SELECT * FROM [schema1].[table1]", 
splitScanSQL);
+
+        splitScanSQL =
+                SqlServerUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        true,
+                        false);
+        Assertions.assertEquals(
+                "SELECT * FROM [schema1].[table1] WHERE [id] <= ? AND NOT 
([id] = ?)",
+                splitScanSQL);
+
+        splitScanSQL =
+                SqlServerUtils.buildSplitScanQuery(
+                        TableId.parse("db1.schema1.table1"),
+                        new SeaTunnelRowType(
+                                new String[] {"id"}, new SeaTunnelDataType[] 
{BasicType.LONG_TYPE}),
+                        false,
+                        true);
+        Assertions.assertEquals("SELECT * FROM [schema1].[table1] WHERE [id] 
>= ?", splitScanSQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 51c5eb67d2..82d02119d1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -137,6 +137,11 @@ public class PostgresDialect implements JdbcDialect {
         return "\"" + getFieldIde(identifier, fieldIde) + "\"";
     }
 
+    @Override
+    public String tableIdentifier(TablePath tablePath) {
+        return tablePath.getFullNameWithQuoted("\"");
+    }
+
     @Override
     public String quoteDatabaseIdentifier(String identifier) {
         return "\"" + identifier + "\"";
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index ff75f5b53c..2993f749c6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -449,7 +451,8 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         return ObjectUtils.compare(obj1, obj2);
     }
 
-    private String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
+    @VisibleForTesting
+    String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
         SeaTunnelRowType rowType =
                 new SeaTunnelRowType(
                         new String[] {split.getSplitKeyName()},
@@ -499,11 +502,11 @@ public class DynamicChunkSplitter extends ChunkSplitter {
         return sql.toString();
     }
 
-    private static void addKeyColumnsToCondition(
+    private void addKeyColumnsToCondition(
             SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
         for (Iterator<String> fieldNamesIt = 
Arrays.stream(rowType.getFieldNames()).iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append(fieldNamesIt.next()).append(predicate);
+            
sql.append(jdbcDialect.quoteIdentifier(fieldNamesIt.next())).append(predicate);
             if (fieldNamesIt.hasNext()) {
                 sql.append(" AND ");
             }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
index 8896e5f960..c71ae7b43d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
@@ -18,7 +18,11 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -30,6 +34,47 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class DynamicChunkSplitterTest {
 
+    @Test
+    public void testGenerateSplitQuerySQL() {
+        JdbcSourceConfig config =
+                JdbcSourceConfig.builder()
+                        .jdbcConnectionConfig(
+                                JdbcConnectionConfig.builder()
+                                        
.url("jdbc:postgresql://localhost:5432/test")
+                                        .driverName("org.postgresql.Driver")
+                                        .build())
+                        .build();
+        DynamicChunkSplitter splitter = new DynamicChunkSplitter(config);
+
+        JdbcSourceSplit split =
+                new JdbcSourceSplit(
+                        TablePath.of("db1", "schema1", "table1"),
+                        "split1",
+                        null,
+                        "id",
+                        BasicType.INT_TYPE,
+                        1,
+                        10);
+        String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+        Assertions.assertEquals(
+                "SELECT * FROM \"db1\".\"schema1\".\"table1\" WHERE \"id\" >= 
? AND NOT (\"id\" = ?) AND \"id\" <= ?",
+                splitQuerySQL);
+
+        split =
+                new JdbcSourceSplit(
+                        TablePath.of("db1", "schema1", "table1"),
+                        "split1",
+                        "select * from table1",
+                        "id",
+                        BasicType.INT_TYPE,
+                        1,
+                        10);
+        splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+        Assertions.assertEquals(
+                "SELECT * FROM (select * from table1) tmp WHERE \"id\" >= ? 
AND NOT (\"id\" = ?) AND \"id\" <= ?",
+                splitQuerySQL);
+    }
+
     @Test
     public void testEfficientShardingThroughSampling() throws 
NoSuchMethodException {
         TablePath tablePath = new TablePath("db", "xe", "table");

Reply via email to