This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 77ffa56f6d [Improve][Transform] Sql transform support inner strucy
query (#6484)
77ffa56f6d is described below
commit 77ffa56f6da0f834928a4c9921035ec324e1c3dc
Author: Jarvis <[email protected]>
AuthorDate: Fri Mar 15 10:38:14 2024 +0800
[Improve][Transform] Sql transform support inner strucy query (#6484)
---
docs/en/transform-v2/sql.md | 60 +++++++++++
docs/zh/transform-v2/sql.md | 58 +++++++++++
.../seatunnel/api/table/type/SeaTunnelRowType.java | 10 +-
.../apache/seatunnel/e2e/transform/TestSQLIT.java | 13 +++
.../test/resources/sql_transform/inner_query.conf | 114 +++++++++++++++++++++
.../transform/sql/zeta/ZetaSQLFunction.java | 32 +++++-
.../seatunnel/transform/sql/zeta/ZetaSQLType.java | 35 ++++++-
7 files changed, 317 insertions(+), 5 deletions(-)
diff --git a/docs/en/transform-v2/sql.md b/docs/en/transform-v2/sql.md
index b6c2306c65..a3bdb9bbfc 100644
--- a/docs/en/transform-v2/sql.md
+++ b/docs/en/transform-v2/sql.md
@@ -24,6 +24,9 @@ The source table name, the query SQL table name must match
this field.
The query SQL, it's a simple SQL supported base function and criteria filter
operation. But the complex SQL unsupported yet, include: multi source
table/rows JOIN and AGGREGATE operation and the like.
+the query expression can be `select [table_name.]column_a` to query the column
that named `column_a`. and the table name is optional.
+or `select c_row.c_inner_row.column_b` to query the inline struct column that
named `column_b` within `c_row` column and `c_inner_row` column. **In this
query expression, can't have table name.**
+
## Example
The data read from source is a table like this:
@@ -56,6 +59,61 @@ Then the data in result table `fake1` will update to
| 3 | Kin Dom_ | 25 |
| 4 | Joy Dom_ | 23 |
+### Struct query
+
+if your upstream data schema is like this:
+
+```hacon
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ string.template = ["innerQuery"]
+ schema = {
+ fields {
+ name = "string"
+ c_date = "date"
+ c_row = {
+ c_inner_row = {
+ c_inner_int = "int"
+ c_inner_string = "string"
+ c_inner_timestamp = "timestamp"
+ c_map_1 = "map<string, string>"
+ c_map_2 = "map<string, map<string,string>>"
+ }
+ c_string = "string"
+ }
+ }
+ }
+ }
+}
+```
+
+Those query all are valid:
+
+```sql
+select
+name,
+c_date,
+c_row,
+c_row.c_inner_row,
+c_row.c_string,
+c_row.c_inner_row.c_inner_int,
+c_row.c_inner_row.c_inner_string,
+c_row.c_inner_row.c_inner_timestamp,
+c_row.c_inner_row.c_map_1,
+c_row.c_inner_row.c_map_1.some_key
+```
+
+But this query are not valid:
+
+```sql
+select
+c_row.c_inner_row.c_map_2.some_key.inner_map_key
+```
+
+The map must be the latest struct, can't query the nesting map.
+
## Job Config Example
```
@@ -94,6 +152,8 @@ sink {
## Changelog
+- Support struct query
+
### new version
- Add SQL Transform Connector
diff --git a/docs/zh/transform-v2/sql.md b/docs/zh/transform-v2/sql.md
index ccbbc7f14c..1b56f1fef3 100644
--- a/docs/zh/transform-v2/sql.md
+++ b/docs/zh/transform-v2/sql.md
@@ -24,6 +24,9 @@ SQL 转换使用内存中的 SQL 引擎,我们可以通过 SQL 函数和 SQL
查询 SQL,它是一个简单的 SQL,支持基本的函数和条件过滤操作。但是,复杂的 SQL 尚不支持,包括:多源表/行连接和聚合操作等。
+查询表达式可以是`select [table_name.]column_a`,这时会去查询列为`column_a`的列,`table_name`为可选项
+也可以是`select
c_row.c_inner_row.column_b`,这时会去查询列`c_row`下的`c_inner_row`的`column_b`。**嵌套结构查询中,不能存在`table_name`**
+
## 示例
源端数据读取的表格如下:
@@ -56,6 +59,61 @@ transform {
| 3 | Kin Dom_ | 25 |
| 4 | Joy Dom_ | 23 |
+### 嵌套结构查询
+
+例如你的上游数据结构是这样:
+
+```hacon
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ string.template = ["innerQuery"]
+ schema = {
+ fields {
+ name = "string"
+ c_date = "date"
+ c_row = {
+ c_inner_row = {
+ c_inner_int = "int"
+ c_inner_string = "string"
+ c_inner_timestamp = "timestamp"
+ c_map_1 = "map<string, string>"
+ c_map_2 = "map<string, map<string,string>>"
+ }
+ c_string = "string"
+ }
+ }
+ }
+ }
+}
+```
+
+那么下列所有的查询表达式都是有效的
+
+```sql
+select
+name,
+c_date,
+c_row,
+c_row.c_inner_row,
+c_row.c_string,
+c_row.c_inner_row.c_inner_int,
+c_row.c_inner_row.c_inner_string,
+c_row.c_inner_row.c_inner_timestamp,
+c_row.c_inner_row.c_map_1,
+c_row.c_inner_row.c_map_1.some_key
+```
+
+但是这个查询语句是无效的
+
+```sql
+select
+c_row.c_inner_row.c_map_2.some_key.inner_map_key
+```
+
+当查询map结构时,map结构应该为最后一个数据结构,不能查询嵌套map
+
## 作业配置示例
```
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
index 4eedb2255a..575def632d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRowType.java
@@ -74,12 +74,20 @@ public class SeaTunnelRowType implements
CompositeType<SeaTunnelRow> {
}
public int indexOf(String fieldName) {
+ return indexOf(fieldName, true);
+ }
+
+ public int indexOf(String fieldName, boolean throwExceptionWhenNotFound) {
for (int i = 0; i < fieldNames.length; i++) {
if (fieldNames[i].equals(fieldName)) {
return i;
}
}
- throw new IllegalArgumentException(String.format("can't find field
[%s]", fieldName));
+ if (throwExceptionWhenNotFound) {
+ throw new IllegalArgumentException(String.format("can't find field
[%s]", fieldName));
+ } else {
+ return -1;
+ }
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 54e2f0ae13..df404a2852 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -17,7 +17,9 @@
package org.apache.seatunnel.e2e.transform;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
@@ -58,4 +60,15 @@ public class TestSQLIT extends TestSuiteBase {
Container.ExecResult caseWhenSql =
container.executeJob("/sql_transform/case_when.conf");
Assertions.assertEquals(0, caseWhenSql.getExitCode());
}
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Spark translation has some issue on map convert")
+ public void testInnerQuery(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult innerQuerySql =
+ container.executeJob("/sql_transform/inner_query.conf");
+ Assertions.assertEquals(0, innerQuerySql.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/inner_query.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/inner_query.conf
new file mode 100644
index 0000000000..65a6324f94
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/inner_query.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ string.template = ["innerQuery"]
+ schema = {
+ fields {
+ name = "string"
+ c_date = "date"
+ c_row = {
+ c_inner_row = {
+ c_inner_int = "int"
+ c_inner_string = "string"
+ c_inner_timestamp = "timestamp"
+ c_map = "map<string, string>"
+ }
+ c_string = "string"
+ }
+ }
+ }
+ }
+}
+
+transform {
+ Sql {
+ source_table_name = "fake"
+ result_table_name = "tmp1"
+ query = """select c_date,
+ c_row.c_string c_string,
+ c_row.c_inner_row.c_inner_string c_inner_string,
+ c_row.c_inner_row.c_inner_timestamp c_inner_timestamp,
+ c_row.c_inner_row.c_map.innerQuery map_val,
+ c_row.c_inner_row.c_map.notExistKey map_not_exist_val
+ from fake"""
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "tmp1"
+ }
+ Assert {
+ source_table_name = "tmp1"
+ rules = {
+ field_rules = [{
+ field_name = "c_date"
+ field_type = "date"
+ field_value = [
+ {rule_type = NOT_NULL}
+ ]
+ },
+ {
+ field_name = "c_string"
+ field_type = "string"
+ field_value = [
+ {equals_to = "innerQuery"}
+ ]
+ },
+ {
+ field_name = "c_inner_string"
+ field_type = "string"
+ field_value = [
+ {equals_to = "innerQuery"}
+ ]
+ },
+ {
+ field_name = "c_inner_timestamp"
+ field_type = "timestamp"
+ field_value = [
+ {rule_type = NOT_NULL}
+ ]
+ },
+ {
+ field_name = "map_val"
+ field_type = "string"
+ field_value = [
+ {rule_type = NOT_NULL}
+ ]
+ },
+ {
+ field_name = "map_not_exist_val"
+ field_type = "null"
+ field_value = [
+ {rule_type = NULL}
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 23cf4844ed..30794af42f 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.transform.sql.zeta;
import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -56,6 +58,7 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
public class ZetaSQLFunction {
// ============================internal functions=====================
@@ -199,8 +202,33 @@ public class ZetaSQLFunction {
return ((StringValue) expression).getValue();
}
if (expression instanceof Column) {
- int idx = inputRowType.indexOf(((Column)
expression).getColumnName());
- return inputFields[idx];
+ Column columnExp = (Column) expression;
+ String columnName = columnExp.getColumnName();
+ int index = inputRowType.indexOf(columnName, false);
+ if (index != -1) {
+ return inputFields[index];
+ } else {
+ String fullyQualifiedName = columnExp.getFullyQualifiedName();
+ String[] columnNames = fullyQualifiedName.split("\\.");
+ int deep = columnNames.length;
+ SeaTunnelDataType parDataType = inputRowType;
+ SeaTunnelRow parRowValues = new SeaTunnelRow(inputFields);
+ Object res = parRowValues;
+ for (int i = 0; i < deep; i++) {
+ if (parDataType instanceof MapType) {
+ return ((Map) res).get(columnNames[i]);
+ }
+ parRowValues = (SeaTunnelRow) res;
+ int idx = ((SeaTunnelRowType)
parDataType).indexOf(columnNames[i], false);
+ if (idx == -1) {
+ throw new IllegalArgumentException(
+ String.format("can't find field [%s]",
fullyQualifiedName));
+ }
+ parDataType = ((SeaTunnelRowType)
parDataType).getFieldType(idx);
+ res = parRowValues.getFields()[idx];
+ }
+ return res;
+ }
}
if (expression instanceof Function) {
Function function = (Function) expression;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 968566e45a..635ce3274f 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.transform.sql.zeta;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
@@ -101,8 +102,38 @@ public class ZetaSQLType {
return BasicType.STRING_TYPE;
}
if (expression instanceof Column) {
- String columnName = ((Column) expression).getColumnName();
- return inputRowType.getFieldType(inputRowType.indexOf(columnName));
+ Column columnExp = (Column) expression;
+ String columnName = columnExp.getColumnName();
+ int index = inputRowType.indexOf(columnName, false);
+ if (index != -1) {
+ return inputRowType.getFieldType(index);
+ } else {
+ // fullback logical to handel struct query.
+ String fullyQualifiedName = columnExp.getFullyQualifiedName();
+ String[] columnNames = fullyQualifiedName.split("\\.");
+ int deep = columnNames.length;
+ SeaTunnelRowType parRowType = inputRowType;
+ SeaTunnelDataType<?> filedTypeRes = null;
+ for (int i = 0; i < deep; i++) {
+ int idx = parRowType.indexOf(columnNames[i], false);
+ if (idx == -1) {
+ throw new IllegalArgumentException(
+ String.format("can't find field [%s]",
fullyQualifiedName));
+ }
+ filedTypeRes = parRowType.getFieldType(idx);
+ if (filedTypeRes instanceof SeaTunnelRowType) {
+ parRowType = (SeaTunnelRowType) filedTypeRes;
+ } else if (filedTypeRes instanceof MapType) {
+ // for map type. only support it's the latest struct.
+ if (i != deep - 2) {
+ throw new IllegalArgumentException(
+ "For now, we only support map struct is
the latest struct in inner query function! Please modify your query!");
+ }
+ return ((MapType<?, ?>) filedTypeRes).getValueType();
+ }
+ }
+ return filedTypeRes;
+ }
}
if (expression instanceof Function) {
return getFunctionType((Function) expression);