This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 849d748d3d [Improve][Jdbc] Add quote identifier for sql (#6669)
849d748d3d is described below
commit 849d748d3dbf3021d1482988b6d995bcd20bb0a3
Author: hailin0 <[email protected]>
AuthorDate: Wed Apr 10 18:07:56 2024 +0800
[Improve][Jdbc] Add quote identifier for sql (#6669)
---
.../seatunnel/cdc/mysql/utils/MySqlUtils.java | 2 +-
.../seatunnel/cdc/mysql/utils/MySqlUtilsTest.java | 72 +++++++++++++++++++++
.../seatunnel/cdc/oracle/utils/OracleUtils.java | 2 +-
.../cdc/oracle/utils/OracleUtilsTest.java | 73 ++++++++++++++++++++++
.../cdc/postgres/utils/PostgresUtils.java | 2 +-
.../cdc/postgres/utils/PostgresUtilsTest.java | 73 ++++++++++++++++++++++
.../cdc/sqlserver/source/utils/SqlServerUtils.java | 2 +-
.../sqlserver/source/utils/SqlServerUtilsTest.java | 72 +++++++++++++++++++++
.../internal/dialect/psql/PostgresDialect.java | 5 ++
.../jdbc/source/DynamicChunkSplitter.java | 9 ++-
.../jdbc/source/DynamicChunkSplitterTest.java | 45 +++++++++++++
11 files changed, 350 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
index 706d7a8862..9b06ddda96 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtils.java
@@ -434,7 +434,7 @@ public class MySqlUtils {
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
- sql.append(fieldNamesIt.next()).append(predicate);
+ sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java
new file mode 100644
index 0000000000..ac03965900
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class MySqlUtilsTest {
+
+ @Test
+ public void testSplitScanQuery() {
+ String splitScanSQL =
+ MySqlUtils.buildSplitScanQuery(
+ TableId.parse("db1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM `db1`.`table1` WHERE `id` >= ? AND NOT (`id` =
?) AND `id` <= ?",
+ splitScanSQL);
+
+ splitScanSQL =
+ MySqlUtils.buildSplitScanQuery(
+ TableId.parse("db1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ true);
+ Assertions.assertEquals("SELECT * FROM `db1`.`table1`", splitScanSQL);
+
+ splitScanSQL =
+ MySqlUtils.buildSplitScanQuery(
+ TableId.parse("db1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM `db1`.`table1` WHERE `id` <= ? AND NOT (`id` =
?)", splitScanSQL);
+
+ splitScanSQL =
+ MySqlUtils.buildSplitScanQuery(
+ TableId.parse("db1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ true);
+ Assertions.assertEquals("SELECT * FROM `db1`.`table1` WHERE `id` >=
?", splitScanSQL);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
index 89e843c393..8d67c0f141 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtils.java
@@ -439,7 +439,7 @@ public class OracleUtils {
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
- sql.append(fieldNamesIt.next()).append(predicate);
+ sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java
new file mode 100644
index 0000000000..10c253da83
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.oracle.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class OracleUtilsTest {
+ @Test
+ public void testSplitScanQuery() {
+ String splitScanSQL =
+ OracleUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND
NOT (\"id\" = ?) AND \"id\" <= ?",
+ splitScanSQL);
+
+ splitScanSQL =
+ OracleUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ true);
+ Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"",
splitScanSQL);
+
+ splitScanSQL =
+ OracleUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND
NOT (\"id\" = ?)",
+ splitScanSQL);
+
+ splitScanSQL =
+ OracleUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ true);
+ Assertions.assertEquals(
+ "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?",
splitScanSQL);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
index b83e287540..576c7fb536 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java
@@ -444,7 +444,7 @@ public class PostgresUtils {
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
- sql.append(fieldNamesIt.next()).append(predicate);
+ sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
new file mode 100644
index 0000000000..e8e5bb22d2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class PostgresUtilsTest {
+ @Test
+ public void testSplitScanQuery() {
+ String splitScanSQL =
+ PostgresUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ? AND
NOT (\"id\" = ?) AND \"id\" <= ?",
+ splitScanSQL);
+
+ splitScanSQL =
+ PostgresUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ true);
+ Assertions.assertEquals("SELECT * FROM \"schema1\".\"table1\"",
splitScanSQL);
+
+ splitScanSQL =
+ PostgresUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND
NOT (\"id\" = ?)",
+ splitScanSQL);
+
+ splitScanSQL =
+ PostgresUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ true);
+ Assertions.assertEquals(
+ "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?",
splitScanSQL);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
index 2c83e02e0e..db1872fa64 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtils.java
@@ -467,7 +467,7 @@ public class SqlServerUtils {
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
- sql.append(fieldNamesIt.next()).append(predicate);
+ sql.append(quote(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java
new file mode 100644
index 0000000000..470a05e54a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.relational.TableId;
+
+public class SqlServerUtilsTest {
+ @Test
+ public void testSplitScanQuery() {
+ String splitScanSQL =
+ SqlServerUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM [schema1].[table1] WHERE [id] >= ? AND NOT
([id] = ?) AND [id] <= ?",
+ splitScanSQL);
+
+ splitScanSQL =
+ SqlServerUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ true);
+ Assertions.assertEquals("SELECT * FROM [schema1].[table1]",
splitScanSQL);
+
+ splitScanSQL =
+ SqlServerUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ true,
+ false);
+ Assertions.assertEquals(
+ "SELECT * FROM [schema1].[table1] WHERE [id] <= ? AND NOT
([id] = ?)",
+ splitScanSQL);
+
+ splitScanSQL =
+ SqlServerUtils.buildSplitScanQuery(
+ TableId.parse("db1.schema1.table1"),
+ new SeaTunnelRowType(
+ new String[] {"id"}, new SeaTunnelDataType[]
{BasicType.LONG_TYPE}),
+ false,
+ true);
+ Assertions.assertEquals("SELECT * FROM [schema1].[table1] WHERE [id]
>= ?", splitScanSQL);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
index 51c5eb67d2..82d02119d1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java
@@ -137,6 +137,11 @@ public class PostgresDialect implements JdbcDialect {
return "\"" + getFieldIde(identifier, fieldIde) + "\"";
}
+ @Override
+ public String tableIdentifier(TablePath tablePath) {
+ return tablePath.getFullNameWithQuoted("\"");
+ }
+
@Override
public String quoteDatabaseIdentifier(String identifier) {
return "\"" + identifier + "\"";
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
index ff75f5b53c..2993f749c6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -449,7 +451,8 @@ public class DynamicChunkSplitter extends ChunkSplitter {
return ObjectUtils.compare(obj1, obj2);
}
- private String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
+ @VisibleForTesting
+ String createDynamicSplitQuerySQL(JdbcSourceSplit split) {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {split.getSplitKeyName()},
@@ -499,11 +502,11 @@ public class DynamicChunkSplitter extends ChunkSplitter {
return sql.toString();
}
- private static void addKeyColumnsToCondition(
+ private void addKeyColumnsToCondition(
SeaTunnelRowType rowType, StringBuilder sql, String predicate) {
for (Iterator<String> fieldNamesIt =
Arrays.stream(rowType.getFieldNames()).iterator();
fieldNamesIt.hasNext(); ) {
- sql.append(fieldNamesIt.next()).append(predicate);
+
sql.append(jdbcDialect.quoteIdentifier(fieldNamesIt.next())).append(predicate);
if (fieldNamesIt.hasNext()) {
sql.append(" AND ");
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
index 8896e5f960..c71ae7b43d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java
@@ -18,7 +18,11 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.BasicType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -30,6 +34,47 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class DynamicChunkSplitterTest {
+ @Test
+ public void testGenerateSplitQuerySQL() {
+ JdbcSourceConfig config =
+ JdbcSourceConfig.builder()
+ .jdbcConnectionConfig(
+ JdbcConnectionConfig.builder()
+
.url("jdbc:postgresql://localhost:5432/test")
+ .driverName("org.postgresql.Driver")
+ .build())
+ .build();
+ DynamicChunkSplitter splitter = new DynamicChunkSplitter(config);
+
+ JdbcSourceSplit split =
+ new JdbcSourceSplit(
+ TablePath.of("db1", "schema1", "table1"),
+ "split1",
+ null,
+ "id",
+ BasicType.INT_TYPE,
+ 1,
+ 10);
+ String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+ Assertions.assertEquals(
+ "SELECT * FROM \"db1\".\"schema1\".\"table1\" WHERE \"id\" >=
? AND NOT (\"id\" = ?) AND \"id\" <= ?",
+ splitQuerySQL);
+
+ split =
+ new JdbcSourceSplit(
+ TablePath.of("db1", "schema1", "table1"),
+ "split1",
+ "select * from table1",
+ "id",
+ BasicType.INT_TYPE,
+ 1,
+ 10);
+ splitQuerySQL = splitter.createDynamicSplitQuerySQL(split);
+ Assertions.assertEquals(
+ "SELECT * FROM (select * from table1) tmp WHERE \"id\" >= ?
AND NOT (\"id\" = ?) AND \"id\" <= ?",
+ splitQuerySQL);
+ }
+
@Test
public void testEfficientShardingThroughSampling() throws
NoSuchMethodException {
TablePath tablePath = new TablePath("db", "xe", "table");