This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 77ffa56f6d [Improve][Transform] Sql transform support inner strucy 
query (#6484)
77ffa56f6d is described below

commit 77ffa56f6da0f834928a4c9921035ec324e1c3dc
Author: Jarvis <[email protected]>
AuthorDate: Fri Mar 15 10:38:14 2024 +0800

    [Improve][Transform] Sql transform support inner strucy query (#6484)
---
 docs/en/transform-v2/sql.md                        |  60 +++++++++++
 docs/zh/transform-v2/sql.md                        |  58 +++++++++++
 .../seatunnel/api/table/type/SeaTunnelRowType.java |  10 +-
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |  13 +++
 .../test/resources/sql_transform/inner_query.conf  | 114 +++++++++++++++++++++
 .../transform/sql/zeta/ZetaSQLFunction.java        |  32 +++++-
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |  35 ++++++-
 7 files changed, 317 insertions(+), 5 deletions(-)

diff --git a/docs/en/transform-v2/sql.md b/docs/en/transform-v2/sql.md
index b6c2306c65..a3bdb9bbfc 100644
--- a/docs/en/transform-v2/sql.md
+++ b/docs/en/transform-v2/sql.md
@@ -24,6 +24,9 @@ The source table name, the query SQL table name must match 
this field.
 
 The query SQL, it's a simple SQL supported base function and criteria filter 
operation. But the complex SQL unsupported yet, include: multi source 
table/rows JOIN and AGGREGATE operation and the like.
 
+the query expression can be `select [table_name.]column_a` to query the column 
that named `column_a`. and the table name is optional.  
+or `select c_row.c_inner_row.column_b` to query the inline struct column that 
named `column_b` within `c_row` column and `c_inner_row` column. **In this 
query expression, can't have table name.**
+
 ## Example
 
 The data read from source is a table like this:
@@ -56,6 +59,61 @@ Then the data in result table `fake1` will update to
 | 3  | Kin Dom_  | 25  |
 | 4  | Joy Dom_  | 23  |
 
+### Struct query
+
+if your upstream data schema is like this:
+
+```hacon
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    string.template = ["innerQuery"]
+    schema = {
+      fields {
+        name = "string"
+        c_date = "date"
+        c_row = {
+          c_inner_row = {
+            c_inner_int = "int"
+            c_inner_string = "string"
+            c_inner_timestamp = "timestamp"
+            c_map_1 = "map<string, string>"
+            c_map_2 = "map<string, map<string,string>>"
+          }
+          c_string = "string"
+        }
+      }
+    }
+  }
+}
+```
+
+Those query all are valid:
+
+```sql
+select 
+name,
+c_date,
+c_row,
+c_row.c_inner_row,
+c_row.c_string,
+c_row.c_inner_row.c_inner_int,
+c_row.c_inner_row.c_inner_string,
+c_row.c_inner_row.c_inner_timestamp,
+c_row.c_inner_row.c_map_1,
+c_row.c_inner_row.c_map_1.some_key
+```
+
+But this query are not valid:
+
+```sql
+select 
+c_row.c_inner_row.c_map_2.some_key.inner_map_key
+```
+
+The map must be the latest struct, can't query the nesting map.
+
 ## Job Config Example
 
 ```
@@ -94,6 +152,8 @@ sink {
 
 ## Changelog
 
+- Support struct query
+
 ### new version
 
 - Add SQL Transform Connector
diff --git a/docs/zh/transform-v2/sql.md b/docs/zh/transform-v2/sql.md
index ccbbc7f14c..1b56f1fef3 100644
--- a/docs/zh/transform-v2/sql.md
+++ b/docs/zh/transform-v2/sql.md
@@ -24,6 +24,9 @@ SQL 转换使用内存中的 SQL 引擎,我们可以通过 SQL 函数和 SQL 
 
 查询 SQL,它是一个简单的 SQL,支持基本的函数和条件过滤操作。但是,复杂的 SQL 尚不支持,包括:多源表/行连接和聚合操作等。
 
+查询表达式可以是`select [table_name.]column_a`,这时会去查询列为`column_a`的列,`table_name`为可选项
+也可以是`select 
c_row.c_inner_row.column_b`,这时会去查询列`c_row`下的`c_inner_row`的`column_b`。**嵌套结构查询中,不能存在`table_name`**
+
 ## 示例
 
 源端数据读取的表格如下:
@@ -56,6 +59,61 @@ transform {
 | 3  | Kin Dom_  | 25  |
 | 4  | Joy Dom_  | 23  |
 
+### 嵌套结构查询
+
+例如你的上游数据结构是这样:
+
+```hacon
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    string.template = ["innerQuery"]
+    schema = {
+      fields {
+        name = "string"
+        c_date = "date"
+        c_row = {
+          c_inner_row = {
+            c_inner_int = "int"
+            c_inner_string = "string"
+            c_inner_timestamp = "timestamp"
+            c_map_1 = "map<string, string>"
+            c_map_2 = "map<string, map<string,string>>"
+          }
+          c_string = "string"
+        }
+      }
+    }
+  }
+}
+```
+
+那么下列所有的查询表达式都是有效的
+
+```sql
+select 
+name,
+c_date,
+c_row,
+c_row.c_inner_row,
+c_row.c_string,
+c_row.c_inner_row.c_inner_int,
+c_row.c_inner_row.c_inner_string,
+c_row.c_inner_row.c_inner_timestamp,
+c_row.c_inner_row.c_map_1,
+c_row.c_inner_row.c_map_1.some_key
+```
+
+但是这个查询语句是无效的
+
+```sql
+select 
+c_row.c_inner_row.c_map_2.some_key.inner_map_key
+```
+
+当查询map结构时,map结构应该为最后一个数据结构,不能查询嵌套map
+
 ## 作业配置示例
 
 ```
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
index 4eedb2255a..575def632d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
@@ -74,12 +74,20 @@ public class SeaTunnelRowType implements 
CompositeType<SeaTunnelRow> {
     }
 
     public int indexOf(String fieldName) {
+        return indexOf(fieldName, true);
+    }
+
+    public int indexOf(String fieldName, boolean throwExceptionWhenNotFound) {
         for (int i = 0; i < fieldNames.length; i++) {
             if (fieldNames[i].equals(fieldName)) {
                 return i;
             }
         }
-        throw new IllegalArgumentException(String.format("can't find field 
[%s]", fieldName));
+        if (throwExceptionWhenNotFound) {
+            throw new IllegalArgumentException(String.format("can't find field 
[%s]", fieldName));
+        } else {
+            return -1;
+        }
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 54e2f0ae13..df404a2852 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.e2e.transform;
 
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.TestTemplate;
@@ -58,4 +60,15 @@ public class TestSQLIT extends TestSuiteBase {
         Container.ExecResult caseWhenSql = 
container.executeJob("/sql_transform/case_when.conf");
         Assertions.assertEquals(0, caseWhenSql.getExitCode());
     }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK},
+            disabledReason = "Spark translation has some issue on map convert")
+    public void testInnerQuery(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult innerQuerySql =
+                container.executeJob("/sql_transform/inner_query.conf");
+        Assertions.assertEquals(0, innerQuerySql.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/inner_query.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/inner_query.conf
new file mode 100644
index 0000000000..65a6324f94
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/inner_query.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    string.template = ["innerQuery"]
+    schema = {
+      fields {
+        name = "string"
+        c_date = "date"
+        c_row = {
+          c_inner_row = {
+            c_inner_int = "int"
+            c_inner_string = "string"
+            c_inner_timestamp = "timestamp"
+            c_map = "map<string, string>"
+          }
+          c_string = "string"
+        }
+      }
+    }
+  }
+}
+
+transform {
+    Sql {
+        source_table_name = "fake"
+        result_table_name = "tmp1"
+        query = """select c_date,
+        c_row.c_string c_string,
+        c_row.c_inner_row.c_inner_string c_inner_string,
+        c_row.c_inner_row.c_inner_timestamp c_inner_timestamp,
+        c_row.c_inner_row.c_map.innerQuery map_val,
+        c_row.c_inner_row.c_map.notExistKey map_not_exist_val
+        from fake"""
+    }
+}
+
+sink {
+  Console {
+    source_table_name = "tmp1"
+  }
+  Assert {
+    source_table_name = "tmp1"
+    rules = {
+      field_rules = [{
+        field_name = "c_date"
+        field_type = "date"
+        field_value = [
+            {rule_type = NOT_NULL}
+          ]
+        },
+        {
+          field_name = "c_string"
+          field_type = "string"
+          field_value = [
+            {equals_to = "innerQuery"}
+          ]
+        },
+        {
+          field_name = "c_inner_string"
+          field_type = "string"
+          field_value = [
+            {equals_to = "innerQuery"}
+          ]
+        },
+        {
+          field_name = "c_inner_timestamp"
+          field_type = "timestamp"
+          field_value = [
+            {rule_type = NOT_NULL}
+          ]
+        },
+        {
+          field_name = "map_val"
+          field_type = "string"
+          field_value = [
+            {rule_type = NOT_NULL}
+          ]
+        },
+        {
+          field_name = "map_not_exist_val"
+          field_type = "null"
+          field_value = [
+            {rule_type = NULL}
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 23cf4844ed..30794af42f 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.transform.sql.zeta;
 
 import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -56,6 +58,7 @@ import java.math.BigDecimal;
 import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class ZetaSQLFunction {
     // ============================internal functions=====================
@@ -199,8 +202,33 @@ public class ZetaSQLFunction {
             return ((StringValue) expression).getValue();
         }
         if (expression instanceof Column) {
-            int idx = inputRowType.indexOf(((Column) 
expression).getColumnName());
-            return inputFields[idx];
+            Column columnExp = (Column) expression;
+            String columnName = columnExp.getColumnName();
+            int index = inputRowType.indexOf(columnName, false);
+            if (index != -1) {
+                return inputFields[index];
+            } else {
+                String fullyQualifiedName = columnExp.getFullyQualifiedName();
+                String[] columnNames = fullyQualifiedName.split("\\.");
+                int deep = columnNames.length;
+                SeaTunnelDataType parDataType = inputRowType;
+                SeaTunnelRow parRowValues = new SeaTunnelRow(inputFields);
+                Object res = parRowValues;
+                for (int i = 0; i < deep; i++) {
+                    if (parDataType instanceof MapType) {
+                        return ((Map) res).get(columnNames[i]);
+                    }
+                    parRowValues = (SeaTunnelRow) res;
+                    int idx = ((SeaTunnelRowType) 
parDataType).indexOf(columnNames[i], false);
+                    if (idx == -1) {
+                        throw new IllegalArgumentException(
+                                String.format("can't find field [%s]", 
fullyQualifiedName));
+                    }
+                    parDataType = ((SeaTunnelRowType) 
parDataType).getFieldType(idx);
+                    res = parRowValues.getFields()[idx];
+                }
+                return res;
+            }
         }
         if (expression instanceof Function) {
             Function function = (Function) expression;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 968566e45a..635ce3274f 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.transform.sql.zeta;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
@@ -101,8 +102,38 @@ public class ZetaSQLType {
             return BasicType.STRING_TYPE;
         }
         if (expression instanceof Column) {
-            String columnName = ((Column) expression).getColumnName();
-            return inputRowType.getFieldType(inputRowType.indexOf(columnName));
+            Column columnExp = (Column) expression;
+            String columnName = columnExp.getColumnName();
+            int index = inputRowType.indexOf(columnName, false);
+            if (index != -1) {
+                return inputRowType.getFieldType(index);
+            } else {
+                // fullback logical to handel struct query.
+                String fullyQualifiedName = columnExp.getFullyQualifiedName();
+                String[] columnNames = fullyQualifiedName.split("\\.");
+                int deep = columnNames.length;
+                SeaTunnelRowType parRowType = inputRowType;
+                SeaTunnelDataType<?> filedTypeRes = null;
+                for (int i = 0; i < deep; i++) {
+                    int idx = parRowType.indexOf(columnNames[i], false);
+                    if (idx == -1) {
+                        throw new IllegalArgumentException(
+                                String.format("can't find field [%s]", 
fullyQualifiedName));
+                    }
+                    filedTypeRes = parRowType.getFieldType(idx);
+                    if (filedTypeRes instanceof SeaTunnelRowType) {
+                        parRowType = (SeaTunnelRowType) filedTypeRes;
+                    } else if (filedTypeRes instanceof MapType) {
+                        //  for map type. only support it's the latest struct.
+                        if (i != deep - 2) {
+                            throw new IllegalArgumentException(
+                                    "For now, we only support map struct is 
the latest struct in inner query function! Please modify your query!");
+                        }
+                        return ((MapType<?, ?>) filedTypeRes).getValueType();
+                    }
+                }
+                return filedTypeRes;
+            }
         }
         if (expression instanceof Function) {
             return getFunctionType((Function) expression);

Reply via email to