This is an automated email from the ASF dual-hosted git repository.
dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c02d4fed36 [Improve][Connector-v2] Use regex to match filedName
placeholders in jdbc sink (#8222)
c02d4fed36 is described below
commit c02d4fed36a6359ae7978a54a70d6c28286eead9
Author: dailai <[email protected]>
AuthorDate: Wed Dec 11 09:38:18 2024 +0800
[Improve][Connector-v2] Use regex to match filedName placeholders in jdbc
sink (#8222)
---
.../executor/FieldNamedPreparedStatement.java | 47 +++++------
.../executor/FieldNamedPreparedStatementTest.java | 96 ++++++++++++++++++++++
.../connectors/seatunnel/jdbc/JdbcMysqlIT.java | 6 +-
.../test/resources/jdbc_mysql_source_and_sink.conf | 2 +-
.../test/resources/jdbc_mysql_source_and_sink.sql | 2 +-
.../jdbc_mysql_source_and_sink_parallel.conf | 2 +-
.../jdbc_mysql_source_and_sink_parallel.sql | 4 +-
...mysql_source_and_sink_parallel_upper_lower.conf | 2 +-
.../transform/sql/zeta/ZetaSQLEngine.java | 19 +++--
.../seatunnel/transform/sql/SQLTransformTest.java | 3 +-
10 files changed, 142 insertions(+), 41 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
index 88e658fc38..8b7f15f364 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatement.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
+
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -47,6 +49,8 @@ import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@@ -669,29 +673,26 @@ public class FieldNamedPreparedStatement implements
PreparedStatement {
connection.prepareStatement(parsedSQL), indexMapping);
}
- private static String parseNamedStatement(String sql, Map<String,
List<Integer>> paramMap) {
- StringBuilder parsedSql = new StringBuilder();
- int fieldIndex = 1; // SQL statement parameter index starts from 1
- int length = sql.length();
- for (int i = 0; i < length; i++) {
- char c = sql.charAt(i);
- if (':' == c) {
- int j = i + 1;
- while (j < length &&
Character.isJavaIdentifierPart(sql.charAt(j))) {
- j++;
- }
- String parameterName = sql.substring(i + 1, j);
- checkArgument(
- !parameterName.isEmpty(),
- "Named parameters in SQL statement must not be
empty.");
- paramMap.computeIfAbsent(parameterName, n -> new
ArrayList<>()).add(fieldIndex);
- fieldIndex++;
- i = j - 1;
- parsedSql.append('?');
- } else {
- parsedSql.append(c);
- }
+ @VisibleForTesting
+ public static String parseNamedStatement(String sql, Map<String,
List<Integer>> paramMap) {
+ Pattern pattern =
+
Pattern.compile(":([\\p{L}\\p{Nl}\\p{Nd}\\p{Pc}\\$\\-\\.@%&*#~!?^+=<>|]+)");
+ Matcher matcher = pattern.matcher(sql);
+
+ StringBuffer result = new StringBuffer();
+ int fieldIndex = 1;
+
+ while (matcher.find()) {
+ String parameterName = matcher.group(1);
+ checkArgument(
+ !parameterName.isEmpty(),
+ "Named parameters in SQL statement must not be empty.");
+ paramMap.computeIfAbsent(parameterName, n -> new
ArrayList<>()).add(fieldIndex++);
+ matcher.appendReplacement(result, "?");
}
- return parsedSql.toString();
+
+ matcher.appendTail(result);
+
+ return result.toString();
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatementTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatementTest.java
new file mode 100644
index 0000000000..b393c844ee
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/FieldNamedPreparedStatementTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FieldNamedPreparedStatementTest {
+
+ private static final String[] SPECIAL_FILEDNAMES =
+ new String[] {
+ "USER@TOKEN",
+ "字段%名称",
+ "field_name",
+ "field.name",
+ "field-name",
+ "$fieldName",
+ "field&key",
+ "field*value",
+ "field#1",
+ "field~test",
+ "field!data",
+ "field?question",
+ "field^caret",
+ "field+add",
+ "field=value",
+ "fieldmax",
+ "field|pipe"
+ };
+
+ @Test
+ public void testParseNamedStatementWithSpecialCharacters() {
+ String sql =
+ "INSERT INTO `nhp_emr_ws`.`cm_prescriptiondetails_cs`
(`USER@TOKEN`, `字段%名称`, `field_name`, `field.name`, `field-name`, `$fieldName`,
`field&key`, `field*value`, `field#1`, `field~test`, `field!data`,
`field?question`, `field^caret`, `field+add`, `field=value`, `fieldmax`,
`field|pipe`) VALUES (:USER@TOKEN, :字段%名称, :field_name, :field.name,
:field-name, :$fieldName, :field&key, :field*value, :field#1, :field~test,
:field!data, :field?question, :field^caret, :field+add, :f [...]
+
+ String exceptPreparedstatement =
+ "INSERT INTO `nhp_emr_ws`.`cm_prescriptiondetails_cs`
(`USER@TOKEN`, `字段%名称`, `field_name`, `field.name`, `field-name`, `$fieldName`,
`field&key`, `field*value`, `field#1`, `field~test`, `field!data`,
`field?question`, `field^caret`, `field+add`, `field=value`, `fieldmax`,
`field|pipe`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON
DUPLICATE KEY UPDATE `USER@TOKEN`=VALUES(`USER@TOKEN`),
`字段%名称`=VALUES(`字段%名称`), `field_name`=VALUES(`field_name`), `field.nam [...]
+
+ Map<String, List<Integer>> paramMap = new HashMap<>();
+ String actualSQL =
FieldNamedPreparedStatement.parseNamedStatement(sql, paramMap);
+ assertEquals(exceptPreparedstatement, actualSQL);
+ for (int i = 0; i < SPECIAL_FILEDNAMES.length; i++) {
+ assertTrue(paramMap.containsKey(SPECIAL_FILEDNAMES[i]));
+ assertEquals(i + 1, paramMap.get(SPECIAL_FILEDNAMES[i]).get(0));
+ }
+ }
+
+ @Test
+ public void testParseNamedStatement() {
+ String sql = "UPDATE table SET col1 = :param1, col2 = :param1 WHERE
col3 = :param2";
+ Map<String, List<Integer>> paramMap = new HashMap<>();
+ String expectedSQL = "UPDATE table SET col1 = ?, col2 = ? WHERE col3 =
?";
+
+ String actualSQL =
FieldNamedPreparedStatement.parseNamedStatement(sql, paramMap);
+
+ assertEquals(expectedSQL, actualSQL);
+ assertTrue(paramMap.containsKey("param1"));
+ assertTrue(paramMap.containsKey("param2"));
+ assertEquals(1, paramMap.get("param1").get(0).intValue());
+ assertEquals(2, paramMap.get("param1").get(1).intValue());
+ assertEquals(3, paramMap.get("param2").get(0).intValue());
+ }
+
+ @Test
+ public void testParseNamedStatementWithNoNamedParameters() {
+ String sql = "SELECT * FROM table";
+ Map<String, List<Integer>> paramMap = new HashMap<>();
+ String expectedSQL = "SELECT * FROM table";
+
+ String actualSQL =
FieldNamedPreparedStatement.parseNamedStatement(sql, paramMap);
+
+ assertEquals(expectedSQL, actualSQL);
+ assertTrue(paramMap.isEmpty());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
index 26181669fc..feac8d11ca 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java
@@ -104,7 +104,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
private static final String CREATE_SQL =
"CREATE TABLE IF NOT EXISTS %s\n"
+ "(\n"
- + " `c_bit_1` bit(1)
DEFAULT NULL,\n"
+ + " `c-bit_1` bit(1)
DEFAULT NULL,\n"
+ " `c_bit_8` bit(8)
DEFAULT NULL,\n"
+ " `c_bit_16` bit(16)
DEFAULT NULL,\n"
+ " `c_bit_32` bit(32)
DEFAULT NULL,\n"
@@ -191,7 +191,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
String executeKey, TestContainer container, Container.ExecResult
execResult) {
String[] fieldNames =
new String[] {
- "c_bit_1",
+ "c-bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
@@ -249,7 +249,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
Pair<String[], List<SeaTunnelRow>> initTestData() {
String[] fieldNames =
new String[] {
- "c_bit_1",
+ "c-bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
index a781c8c3f2..45febb436f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf
@@ -46,7 +46,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"
- query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32,
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint,
c_smallint_unsigned,
+ query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32,
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint,
c_smallint_unsigned,
c_mediumint,
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned,
c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext,
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
index 84f049bec1..4b0240e3fe 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.sql
@@ -51,7 +51,7 @@ CREATE TABLE sink_table WITH (
INSERT INTO sink_table
- SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint,
c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
+ SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean,
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint,
c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double,
c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json,
c_longtext, c_date,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
index 48474c6dfa..e21c75992c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf
@@ -45,7 +45,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"
connection_check_timeout_sec = 100
- query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32,
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint,
c_smallint_unsigned,
+ query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32,
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint,
c_smallint_unsigned,
c_mediumint,
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned,
c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext,
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
index bc032b9c22..33a273f341 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.sql
@@ -49,7 +49,7 @@ CREATE TABLE sink_table WITH (
CREATE TABLE temp1 AS
- SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean,
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
+ SELECT `c-bit_1`, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean,
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint,
c_bigint_unsigned,
c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double,
c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json,
c_longtext, c_date,
@@ -58,4 +58,4 @@ CREATE TABLE temp1 AS
INSERT INTO sink_table SELECT * FROM temp1;
-
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
index cd486d4c4e..b6b942af18 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf
@@ -46,7 +46,7 @@ sink {
user = "root"
password = "Abc!@#135_seatunnel"
connection_check_timeout_sec = 100
- query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32,
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint,
c_smallint_unsigned,
+ query = """insert into sink (`c-bit_1`, c_bit_8, c_bit_16, c_bit_32,
c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint,
c_smallint_unsigned,
c_mediumint,
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned,
c_decimal, c_decimal_unsigned,
c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext,
c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date,
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 2848cc9094..9318ff0b05 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -183,7 +183,7 @@ public class ZetaSQLEngine implements SQLEngine {
for (SelectItem selectItem : selectItems) {
if (selectItem.getExpression() instanceof AllColumns) {
for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
- fieldNames[idx] = inputRowType.getFieldName(i);
+ fieldNames[idx] =
cleanEscape(inputRowType.getFieldName(i));
seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
if (inputColumnsMapping != null) {
inputColumnsMapping.set(idx,
inputRowType.getFieldName(i));
@@ -194,16 +194,12 @@ public class ZetaSQLEngine implements SQLEngine {
Expression expression = selectItem.getExpression();
if (selectItem.getAlias() != null) {
String aliasName = selectItem.getAlias().getName();
- if (aliasName.startsWith(ESCAPE_IDENTIFIER)
- && aliasName.endsWith(ESCAPE_IDENTIFIER)) {
- aliasName = aliasName.substring(1, aliasName.length()
- 1);
- }
- fieldNames[idx] = aliasName;
+ fieldNames[idx] = cleanEscape(aliasName);
} else {
if (expression instanceof Column) {
- fieldNames[idx] = ((Column)
expression).getColumnName();
+ fieldNames[idx] = cleanEscape(((Column)
expression).getColumnName());
} else {
- fieldNames[idx] = expression.toString();
+ fieldNames[idx] = cleanEscape(expression.toString());
}
}
@@ -225,6 +221,13 @@ public class ZetaSQLEngine implements SQLEngine {
fieldNames, seaTunnelDataTypes, lateralViews,
inputColumnsMapping);
}
+ private static String cleanEscape(String columnName) {
+ if (columnName.startsWith(ESCAPE_IDENTIFIER) &&
columnName.endsWith(ESCAPE_IDENTIFIER)) {
+ columnName = columnName.substring(1, columnName.length() - 1);
+ }
+ return columnName;
+ }
+
@Override
public List<SeaTunnelRow> transformBySQL(SeaTunnelRow inputRow,
SeaTunnelRowType outRowType) {
// ------Physical Query Plan Execution------
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index fcf14cc7b9..999b7fecd4 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -166,12 +166,13 @@ public class SQLTransformTest {
ReadonlyConfig.fromMap(
Collections.singletonMap(
"query",
- "select id, trim(`apply`) as `apply` from test
where `apply` = 'a'"));
+ "select `id`, trim(`apply`) as `apply` from
test where `apply` = 'a'"));
SQLTransform sqlTransform = new SQLTransform(config, table);
TableSchema tableSchema = sqlTransform.transformTableSchema();
List<SeaTunnelRow> result =
sqlTransform.transformRow(
new SeaTunnelRow(new Object[] {Integer.valueOf(1),
String.valueOf("a")}));
+ Assertions.assertEquals("id", tableSchema.getFieldNames()[0]);
Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
Assertions.assertEquals("a", result.get(0).getField(1));
result =