This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 82f5d8c712 [Improve][SQL-Transform] Remove escape identifier from 
output fields (#7297)
82f5d8c712 is described below

commit 82f5d8c71212b0ceb4a57d9cc7cd97cab5aa6d4e
Author: hailin0 <[email protected]>
AuthorDate: Sat Aug 3 12:02:19 2024 +0800

    [Improve][SQL-Transform] Remove escape identifier from output fields (#7297)
---
 .../test/resources/sql_transform/func_system.conf  |   9 +-
 .../transform/sql/zeta/ZetaSQLEngine.java          |  10 +-
 .../transform/sql/zeta/ZetaSQLFunction.java        |  26 +++-
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |  16 ++-
 .../seatunnel/transform/sql/SQLTransformTest.java  | 136 +++++++++++++++++++++
 5 files changed, 191 insertions(+), 6 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
index 14f41665e3..a189c7c2dd 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_system.conf
@@ -49,7 +49,7 @@ transform {
   Sql {
     source_table_name = "fake"
     result_table_name = "fake1"
-    query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id 
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as 
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, 
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, 
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as 
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, 
cast(name as bytes) as c7 from fake"
+    query = "select cast(id as STRING) as id, cast(id as INT) as id2, cast(id 
as DOUBLE) as id3 , cast(c1 as double) as c1_1, cast(c1 as DECIMAL(10,2)) as 
c1_2, cast(c2 as DATE) as c2_1, coalesce(c3,'Unknown') c3_1, 
ifnull(c3,'Unknown') c3_2, ifnull(nullif(name,'Joy Ding'),'NULL') name1, 
nullif(name,'Joy Ding_') name2, cast(c4 as timestamp) as c4_1, cast(c4 as 
decimal(17,4)) as c4_2, cast(c5 as date) as c5, cast(c6 as time) as c6, 
cast(name as bytes) as c7, name as `apply` from fake"
   }
 }
 
@@ -164,6 +164,13 @@ sink {
               rule_type = NOT_NULL
             }
           ]
+        },
+        {
+          field_name = "apply"
+          field_type = "string"
+          field_value = [
+            {equals_to = "Joy Ding"}
+          ]
         }
       ]
     }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 42f5d8205d..993b4e0a3c 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -50,6 +50,8 @@ import java.util.stream.Collectors;
 
 public class ZetaSQLEngine implements SQLEngine {
     private static final Logger log = 
LoggerFactory.getLogger(ZetaSQLEngine.class);
+    public static final String ESCAPE_IDENTIFIER = "`";
+
     private String inputTableName;
     @Nullable private String catalogTableName;
     private SeaTunnelRowType inputRowType;
@@ -193,9 +195,13 @@ public class ZetaSQLEngine implements SQLEngine {
             } else if (selectItem instanceof SelectExpressionItem) {
                 SelectExpressionItem expressionItem = (SelectExpressionItem) 
selectItem;
                 Expression expression = expressionItem.getExpression();
-
                 if (expressionItem.getAlias() != null) {
-                    fieldNames[idx] = expressionItem.getAlias().getName();
+                    String aliasName = expressionItem.getAlias().getName();
+                    if (aliasName.startsWith(ESCAPE_IDENTIFIER)
+                            && aliasName.endsWith(ESCAPE_IDENTIFIER)) {
+                        aliasName = aliasName.substring(1, aliasName.length() 
- 1);
+                    }
+                    fieldNames[idx] = aliasName;
                 } else {
                     if (expression instanceof Column) {
                         fieldNames[idx] = ((Column) 
expression).getColumnName();
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 44b9ca20b7..a6221e4a27 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -227,6 +227,13 @@ public class ZetaSQLFunction {
             Column columnExp = (Column) expression;
             String columnName = columnExp.getColumnName();
             int index = inputRowType.indexOf(columnName, false);
+            if (index == -1
+                    && columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+                    && columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+                columnName = columnName.substring(1, columnName.length() - 1);
+                index = inputRowType.indexOf(columnName, false);
+            }
+
             if (index != -1) {
                 return inputFields[index];
             } else {
@@ -237,11 +244,26 @@ public class ZetaSQLFunction {
                 SeaTunnelRow parRowValues = new SeaTunnelRow(inputFields);
                 Object res = parRowValues;
                 for (int i = 0; i < deep; i++) {
+                    String key = columnNames[i];
                     if (parDataType instanceof MapType) {
-                        return ((Map) res).get(columnNames[i]);
+                        Map<String, Object> mapValue = ((Map) res);
+                        if (mapValue.containsKey(key)) {
+                            return mapValue.get(key);
+                        } else if 
(key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+                                && 
key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+                            key = key.substring(1, key.length() - 1);
+                            return mapValue.get(key);
+                        }
+                        return null;
                     }
                     parRowValues = (SeaTunnelRow) res;
-                    int idx = ((SeaTunnelRowType) 
parDataType).indexOf(columnNames[i], false);
+                    int idx = ((SeaTunnelRowType) parDataType).indexOf(key, 
false);
+                    if (idx == -1
+                            && key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+                            && key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+                        key = key.substring(1, key.length() - 1);
+                        idx = ((SeaTunnelRowType) parDataType).indexOf(key, 
false);
+                    }
                     if (idx == -1) {
                         throw new IllegalArgumentException(
                                 String.format("can't find field [%s]", 
fullyQualifiedName));
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 45b269bae6..9b527ae8c2 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -111,6 +111,13 @@ public class ZetaSQLType {
             Column columnExp = (Column) expression;
             String columnName = columnExp.getColumnName();
             int index = inputRowType.indexOf(columnName, false);
+            if (index == -1
+                    && columnName.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+                    && columnName.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+                columnName = columnName.substring(1, columnName.length() - 1);
+                index = inputRowType.indexOf(columnName, false);
+            }
+
             if (index != -1) {
                 return inputRowType.getFieldType(index);
             } else {
@@ -121,7 +128,14 @@ public class ZetaSQLType {
                 SeaTunnelRowType parRowType = inputRowType;
                 SeaTunnelDataType<?> filedTypeRes = null;
                 for (int i = 0; i < deep; i++) {
-                    int idx = parRowType.indexOf(columnNames[i], false);
+                    String key = columnNames[i];
+                    int idx = parRowType.indexOf(key, false);
+                    if (idx == -1
+                            && key.startsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)
+                            && key.endsWith(ZetaSQLEngine.ESCAPE_IDENTIFIER)) {
+                        key = key.substring(1, key.length() - 1);
+                        idx = parRowType.indexOf(key, false);
+                    }
                     if (idx == -1) {
                         throw new IllegalArgumentException(
                                 String.format("can't find field [%s]", 
fullyQualifiedName));
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index 854fae5cb3..ff253eac21 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -19,18 +19,22 @@ package org.apache.seatunnel.transform.sql;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Objects;
 
@@ -144,4 +148,136 @@ public class SQLTransformTest {
                 new ArrayList<>(),
                 "It has column information.");
     }
+
+    @Test
+    public void testEscapeIdentifier() {
+        String tableName = "test";
+        String[] fields = new String[] {"id", "apply"};
+        CatalogTable table =
+                CatalogTableUtil.getCatalogTable(
+                        tableName,
+                        new SeaTunnelRowType(
+                                fields,
+                                new SeaTunnelDataType[] {
+                                    BasicType.INT_TYPE, BasicType.STRING_TYPE
+                                }));
+        ReadonlyConfig config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, trim(`apply`) as `apply` from test 
where `apply` = 'a'"));
+        SQLTransform sqlTransform = new SQLTransform(config, table);
+        TableSchema tableSchema = sqlTransform.transformTableSchema();
+        SeaTunnelRow result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
String.valueOf("a")}));
+        Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals("a", result.getField(1));
+        result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
String.valueOf("b")}));
+        Assertions.assertNull(result);
+
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, IFNULL(`apply`, '1') as `apply` 
from test  where `apply` = 'a'"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+        result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
String.valueOf("a")}));
+        Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+        Assertions.assertEquals("a", result.getField(1));
+
+        table =
+                CatalogTableUtil.getCatalogTable(
+                        tableName,
+                        new SeaTunnelRowType(
+                                fields,
+                                new SeaTunnelDataType[] {BasicType.INT_TYPE, 
BasicType.LONG_TYPE}));
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, `apply` + 1 as `apply` from test 
where `apply` > 0"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+        result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
Long.valueOf(1)}));
+        Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(BasicType.LONG_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+        Assertions.assertEquals(Long.valueOf(2), result.getField(1));
+        result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(new Object[] {Integer.valueOf(1), 
Long.valueOf(0)}));
+        Assertions.assertNull(result);
+
+        table =
+                CatalogTableUtil.getCatalogTable(
+                        tableName,
+                        new SeaTunnelRowType(
+                                fields,
+                                new SeaTunnelDataType[] {
+                                    BasicType.INT_TYPE,
+                                    new MapType<String, String>(
+                                            BasicType.STRING_TYPE, 
BasicType.STRING_TYPE)
+                                }));
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, `apply`.k1 as `apply` from test 
where `apply`.k1 = 'a'"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+        result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    Integer.valueOf(1), 
Collections.singletonMap("k1", "a")
+                                }));
+        Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+        Assertions.assertEquals("a", result.getField(1));
+        result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    Integer.valueOf(1), 
Collections.singletonMap("k1", "b")
+                                }));
+        Assertions.assertNull(result);
+
+        table =
+                CatalogTableUtil.getCatalogTable(
+                        tableName,
+                        new SeaTunnelRowType(
+                                new String[] {"id", "map"},
+                                new SeaTunnelDataType[] {
+                                    BasicType.INT_TYPE,
+                                    new MapType<String, String>(
+                                            BasicType.STRING_TYPE, 
BasicType.STRING_TYPE)
+                                }));
+        config =
+                ReadonlyConfig.fromMap(
+                        Collections.singletonMap(
+                                "query",
+                                "select id, map.`apply` as `apply` from test 
where map.`apply` = 'a'"));
+        sqlTransform = new SQLTransform(config, table);
+        tableSchema = sqlTransform.transformTableSchema();
+        result =
+                sqlTransform.transformRow(
+                        new SeaTunnelRow(
+                                new Object[] {
+                                    Integer.valueOf(1), 
Collections.singletonMap("apply", "a")
+                                }));
+        Assertions.assertEquals("apply", tableSchema.getFieldNames()[1]);
+        Assertions.assertEquals(
+                BasicType.STRING_TYPE, 
tableSchema.getColumns().get(1).getDataType());
+        Assertions.assertEquals("a", result.getField(1));
+    }
 }

Reply via email to