This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 82f5d8c712 [Improve][SQL-Transform] Remove escape identifier from
output fields (#7297)
82f5d8c712 is described below
commit 82f5d8c71212b0ceb4a57d9cc7cd97cab5aa6d4e
Author: hailin0 <[email protected]>
AuthorDate: Sat Aug 3 12:02:19 2024 +0800
[Improve][SQL-Transform] Remove escape identifier from output fields (#7297)
---
.../test/resources/sql_transform/func_system.conf | 9 +-
.../transform/sql/zeta/ZetaSQLEngine.java | 10 +-
.../transform/sql/zeta/ZetaSQLFunction.java | 26 +++-
.../seatunnel/transform/sql/zeta/ZetaSQLType.java | 16 ++-
.../seatunnel/transform/sql/SQLTransformTest.java | 136 +++++++++++++++++++++
5 files changed, 191 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
index 14f41665e3..a189c7c2dd 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
@@ -49,7 +49,7 @@ transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
- query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1,
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1,
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6,
cast(name as bytes) as c7 from fake"
+ query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1,
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1,
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6,
cast(name as bytes) as c7, name as `apply` from fake"
}
}
@@ -164,6 +164,13 @@ sink {
rule_type = NOT_NULL
}
]
+ },
+ {
+ field_name = "apply"
+ field_type = "string"
+ field_value = [
+ {equals_to = "Joy Ding"}
+ ]
}
]
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 42f5d8205d..993b4e0a3c 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -50,6 +50,8 @@ import java.util.stream.Collectors;
public class ZetaSQLEngine implements SQLEngine {
private static final Logger log =
LoggerFactory.getLogger(ZetaSQLEngine.class);
+ public static final String ESCAPE_IDENTIFIER = "`";
+
private String inputTableName;
@Nullable private String catalogTableName;
private SeaTunnelRowType inputRowType;
@@ -193,9 +195,13 @@ public class ZetaSQLEngine implements SQLEngine {
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem expressionItem = (SelectExpressionItem)
selectItem;
Expression expression = expressionItem.getExpression();
-
if (expressionItem.getAlias() != null) {
- fieldNames[idx] = expressionItem.getAlias().getName();
+ String aliasName = expressionItem.getAlias().getName();
+ if (aliasName.startsWith(ESCAPE_IDENTIFIER)
+ && aliasName.endsWith(ESCAPE_IDENTIFIER)) {
+ aliasName = aliasName.substring(1, aliasName.length()
- 1);
+ }
+ fieldNames[idx] = aliasName;
} else {
if (expression instanceof Column) {
fieldNames[idx] = ((Column)
expression).getColumnName();
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 44b9ca20b7..a6221e4a27 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -227,6 +227,13 @@ public class ZetaSQLFunction {
Column columnExp = (Column) expression;
String columnName = columnExp.getColumnName();
int index = inputRowType.indexOf(columnName, false);
+ if (index == -1
+ && columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+ && columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+ columnName = columnName.substring(1, columnName.length() - 1);
+ index = inputRowType.indexOf(columnName, false);
+ }
+
if (index != -1) {
return inputFields[index];
} else {
@@ -237,11 +244,26 @@ public class ZetaSQLFunction {
SeaTunnelRow parRowValues = new SeaTunnelRow(inputFields);
Object res = parRowValues;
for (int i = 0; i < deep; i++) {
+ String key = columnNames[i];
if (parDataType instanceof MapType) {
- return ((Map) res).get(columnNames[i]);
+ Map<String, Object> mapValue = ((Map) res);
+ if (mapValue.containsKey(key)) {
+ return mapValue.get(key);
+ } else if
(key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+ &&
key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+ key = key.substring(1, key.length() - 1);
+ return mapValue.get(key);
+ }
+ return null;
}
parRowValues = (SeaTunnelRow) res;
- int idx = ((SeaTunnelRowType)
parDataType).indexOf(columnNames[i], false);
+ int idx = ((SeaTunnelRowType) parDataType).indexOf(key,
false);
+ if (idx == -1
+ && key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+ && key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+ key = key.substring(1, key.length() - 1);
+ idx = ((SeaTunnelRowType) parDataType).indexOf(key,
false);
+ }
if (idx == -1) {
throw new IllegalArgumentException(
String.format("can't find field [%s]",
fullyQualifiedName));
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 45b269bae6..9b527ae8c2 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -111,6 +111,13 @@ public class ZetaSQLType {
Column columnExp = (Column) expression;
String columnName = columnExp.getColumnName();
int index = inputRowType.indexOf(columnName, false);
+ if (index == -1
+ && columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+ && columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+ columnName = columnName.substring(1, columnName.length() - 1);
+ index = inputRowType.indexOf(columnName, false);
+ }
+
if (index != -1) {
return inputRowType.getFieldType(index);
} else {
@@ -121,7 +128,14 @@ public class ZetaSQLType {
SeaTunnelRowType parRowType = inputRowType;
SeaTunnelDataType<?> filedTypeRes = null;
for (int i = 0; i < deep; i++) {
- int idx = parRowType.indexOf(columnNames[i], false);
+ String key = columnNames[i];
+ int idx = parRowType.indexOf(key, false);
+ if (idx == -1
+ && key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+ && key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+ key = key.substring(1, key.length() - 1);
+ idx = parRowType.indexOf(key, false);
+ }
if (idx == -1) {
throw new IllegalArgumentException(
String.format("can't find field [%s]",
fullyQualifiedName));
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index 854fae5cb3..ff253eac21 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -19,18 +19,22 @@ package org.apache.seatunnel.transform.sql;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
@@ -144,4 +148,136 @@ public class SQLTransformTest {
new ArrayList<>(),
"It has column information.");
}
+
+ @Test
+ public void testEscapeIdentifier() {
+ String tableName = "test";
+ String[] fields = new String[] {"id", "apply"};
+ CatalogTable table =
+ CatalogTableUtil.getCatalogTable(
+ tableName,
+ new SeaTunnelRowType(
+ fields,
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.STRING_TYPE
+ }));
+ ReadonlyConfig config =
+ ReadonlyConfig.fromMap(
+ Collections.singletonMap(
+ "query",
+ "select id, trim(`apply`) as `apply` from test
where `apply` = 'a'"));
+ SQLTransform sqlTransform = new SQLTransform(config, table);
+ TableSchema tableSchema = sqlTransform.transformTableSchema();
+ SeaTunnelRow result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(new Object[] {Integer.valueOf(1),
String.valueOf("a")}));
+ Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+ Assertions.assertEquals("a", result.getField(1));
+ result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(new Object[] {Integer.valueOf(1),
String.valueOf("b")}));
+ Assertions.assertNull(result);
+
+ config =
+ ReadonlyConfig.fromMap(
+ Collections.singletonMap(
+ "query",
+ "select id, IFNULL(`apply`, '1') as `apply`
from test where `apply` = 'a'"));
+ sqlTransform = new SQLTransform(config, table);
+ tableSchema = sqlTransform.transformTableSchema();
+ result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(new Object[] {Integer.valueOf(1),
String.valueOf("a")}));
+ Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+ Assertions.assertEquals(
+ BasicType.STRING_TYPE,
tableSchema.getColumns().get(1).getDataType());
+ Assertions.assertEquals("a", result.getField(1));
+
+ table =
+ CatalogTableUtil.getCatalogTable(
+ tableName,
+ new SeaTunnelRowType(
+ fields,
+ new SeaTunnelDataType[] {BasicType.INT_TYPE,
BasicType.LONG_TYPE}));
+ config =
+ ReadonlyConfig.fromMap(
+ Collections.singletonMap(
+ "query",
+ "select id, `apply` + 1 as `apply` from test
where `apply` > 0"));
+ sqlTransform = new SQLTransform(config, table);
+ tableSchema = sqlTransform.transformTableSchema();
+ result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(new Object[] {Integer.valueOf(1),
Long.valueOf(1)}));
+ Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+ Assertions.assertEquals(BasicType.LONG_TYPE,
tableSchema.getColumns().get(1).getDataType());
+ Assertions.assertEquals(Long.valueOf(2), result.getField(1));
+ result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(new Object[] {Integer.valueOf(1),
Long.valueOf(0)}));
+ Assertions.assertNull(result);
+
+ table =
+ CatalogTableUtil.getCatalogTable(
+ tableName,
+ new SeaTunnelRowType(
+ fields,
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ new MapType<String, String>(
+ BasicType.STRING_TYPE,
BasicType.STRING_TYPE)
+ }));
+ config =
+ ReadonlyConfig.fromMap(
+ Collections.singletonMap(
+ "query",
+ "select id, `apply`.k1 as `apply` from test
where `apply`.k1 = 'a'"));
+ sqlTransform = new SQLTransform(config, table);
+ tableSchema = sqlTransform.transformTableSchema();
+ result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(
+ new Object[] {
+ Integer.valueOf(1),
Collections.singletonMap("k1", "a")
+ }));
+ Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+ Assertions.assertEquals(
+ BasicType.STRING_TYPE,
tableSchema.getColumns().get(1).getDataType());
+ Assertions.assertEquals("a", result.getField(1));
+ result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(
+ new Object[] {
+ Integer.valueOf(1),
Collections.singletonMap("k1", "b")
+ }));
+ Assertions.assertNull(result);
+
+ table =
+ CatalogTableUtil.getCatalogTable(
+ tableName,
+ new SeaTunnelRowType(
+ new String[] {"id", "map"},
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE,
+ new MapType<String, String>(
+ BasicType.STRING_TYPE,
BasicType.STRING_TYPE)
+ }));
+ config =
+ ReadonlyConfig.fromMap(
+ Collections.singletonMap(
+ "query",
+ "select id, map.`apply` as `apply` from test
where map.`apply` = 'a'"));
+ sqlTransform = new SQLTransform(config, table);
+ tableSchema = sqlTransform.transformTableSchema();
+ result =
+ sqlTransform.transformRow(
+ new SeaTunnelRow(
+ new Object[] {
+ Integer.valueOf(1),
Collections.singletonMap("apply", "a")
+ }));
+ Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+ Assertions.assertEquals(
+ BasicType.STRING_TYPE,
tableSchema.getColumns().get(1).getDataType());
+ Assertions.assertEquals("a", result.getField(1));
+ }
}