This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 24ffaeb14a [Improve] Support JSON multi-level nested parsing (#10000)
24ffaeb14a is described below
commit 24ffaeb14ae47cd0cd555815461fb5ecb538002e
Author: misi <[email protected]>
AuthorDate: Tue Nov 18 21:13:42 2025 +0800
[Improve] Support JSON multi-level nested parsing (#10000)
Co-authored-by: misi <[email protected]>
---
.../apache/seatunnel/e2e/transform/TestSQLIT.java | 4 +
.../resources/sql_transform/func_null_return.conf | 120 +++++++++++
.../transform/sql/zeta/ZetaSQLFunction.java | 3 +
.../transform/sql/zeta/CastFunctionTest.java | 229 +++++++++++++++++++++
4 files changed, 356 insertions(+)
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 5b04022a67..2e0a2a0851 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -84,6 +84,10 @@ public class TestSQLIT extends TestSuiteBase {
Container.ExecResult multiIfSql =
container.executeJob("/sql_transform/func_multi_if.conf");
Assertions.assertEquals(0, multiIfSql.getExitCode());
+
+ Container.ExecResult nullReturnSql =
+ container.executeJob("/sql_transform/func_null_return.conf");
+ Assertions.assertEquals(0, nullReturnSql.getExitCode());
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_null_return.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_null_return.conf
new file mode 100644
index 0000000000..c219e2b5ba
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_null_return.conf
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ nullable_field = "string"
+ nested_data = {
+ fields = {
+ inner_field = "string"
+ inner_nullable = "string"
+ }
+ }
+ }
+ }
+ rows = [
+ {fields = [1, "Test Name", null, {inner_field: "inner_value",
inner_nullable: null}], kind = INSERT},
+ {fields = [2, "Another Name", "Some Value", {inner_field:
"another_inner", inner_nullable: "non_null"}], kind = INSERT},
+ {fields = [3, "Third Name", null, {inner_field: null, inner_nullable:
null}], kind = INSERT}
+ ]
+ }
+}
+
+transform {
+ sql {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ query = "select id, name, nullable_field, nested_data.inner_field,
nested_data.inner_nullable, nested_data.inner_field as copied_field from fake"
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "fake1"
+ rules = {
+ field_rules = [
+ {
+ field_name = "id"
+ field_type = "int"
+ field_value = [
+ {equals_to = 1},
+ {equals_to = 2},
+ {equals_to = 3}
+ ]
+ },
+ {
+ field_name = "name"
+ field_type = "string"
+ field_value = [
+ {equals_to = "Test Name"},
+ {equals_to = "Another Name"},
+ {equals_to = "Third Name"}
+ ]
+ },
+ {
+ field_name = "nullable_field"
+ field_type = "string"
+ field_value = [
+ {is_null = true},
+ {equals_to = "Some Value"},
+ {is_null = true}
+ ]
+ },
+ {
+ field_name = "inner_field"
+ field_type = "string"
+ field_value = [
+ {equals_to = "inner_value"},
+ {equals_to = "another_inner"},
+ {is_null = true}
+ ]
+ },
+ {
+ field_name = "inner_nullable"
+ field_type = "string"
+ field_value = [
+ {is_null = true},
+ {equals_to = "non_null"},
+ {is_null = true}
+ ]
+ },
+ {
+ field_name = "copied_field"
+ field_type = "string"
+ field_value = [
+ {equals_to = "inner_value"},
+ {equals_to = "another_inner"},
+ {is_null = true}
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 4adadaf60f..de1a78eae2 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -340,6 +340,9 @@ public class ZetaSQLFunction {
}
parDataType = ((SeaTunnelRowType)
parDataType).getFieldType(idx);
res = parRowValues.getFields()[idx];
+ if (res == null) {
+ return null;
+ }
}
return res;
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
index b5c5ae377e..c854bc021e 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/CastFunctionTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.transform.sql.zeta;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -54,4 +55,232 @@ public class CastFunctionTest {
Assertions.assertEquals(Byte.parseByte("1"), f2Object);
Assertions.assertEquals(Short.parseShort("1"), f3Object);
}
+
+ @Test
+ public void testCastFunctionWithNullNestedField() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"user"},
+ new SeaTunnelDataType[] {
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE)
+ });
+
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {null});
+
+ sqlEngine.init("test", null, rowType, "select user.address as address
from test");
+
+ SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
outRowType).get(0);
+
+ Object addressField = outRow.getField(0);
+ Assertions.assertNull(
+ addressField,
+ "When casting nested field where intermediate value is null,
result should be null");
+ }
+
+ @Test
+ public void testCastFunctionWithNestedField() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ // Create a map with nested data
+ MapType<String, String> mapType =
+ new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE);
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(new String[] {"user"}, new
SeaTunnelDataType[] {mapType});
+
+ // Create input data with nested fields
+ java.util.Map<String, String> userData = new java.util.HashMap<>();
+ userData.put("address", "123 Main St");
+ userData.put("age", "25");
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {userData});
+
+ sqlEngine.init(
+ "test",
+ null,
+ rowType,
+ "select user.address as address, cast(user.age as INT) as age
from test");
+
+ SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
outRowType).get(0);
+
+ // Check nested field access
+ Assertions.assertEquals("123 Main St", outRow.getField(0)); // Direct
access
+ Assertions.assertEquals(25, outRow.getField(1)); // Cast from nested
field
+ }
+
+ @Test
+ public void testCastFunctionWithNormalValues() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"str_field", "int_field"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE,
BasicType.INT_TYPE});
+
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {"123", 456});
+
+ sqlEngine.init(
+ "test",
+ null,
+ rowType,
+ "select str_field, cast(str_field as INT) as int_from_str, "
+ + "int_field, cast(int_field as STRING) as
str_from_int from test");
+
+ SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
outRowType).get(0);
+
+ // Original values should remain unchanged
+ Assertions.assertEquals("123", outRow.getField(0));
+ Assertions.assertEquals(456, outRow.getField(2));
+
+ // Cast conversions
+ Assertions.assertEquals(123, outRow.getField(1)); // String to Int
+ Assertions.assertEquals("456", outRow.getField(3)); // Int to String
+ }
+
+ @Test
+ public void testNormalNestedRowFieldAccess() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ // Create nested row type structure
+ SeaTunnelRowType innerRowType =
+ new SeaTunnelRowType(
+ new String[] {"street", "city"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE,
BasicType.STRING_TYPE});
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(new String[] {"user"}, new
SeaTunnelDataType[] {innerRowType});
+
+ // Create nested row data
+ SeaTunnelRow innerRow = new SeaTunnelRow(new Object[] {"123 Main St",
"New York"});
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {innerRow});
+
+ sqlEngine.init(
+ "test", null, rowType, "select user.street as street,
user.city as city from test");
+
+ SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
outRowType).get(0);
+
+ // Verify normal nested field access (testing lines 343-345 in
ZetaSQLFunction)
+ Assertions.assertEquals("123 Main St", outRow.getField(0));
+ Assertions.assertEquals("New York", outRow.getField(1));
+ }
+
+ @Test
+ public void testMultiLevelNestedRowFieldAccess() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ // Create multi-level nested row type structure
+ SeaTunnelRowType addressRowType =
+ new SeaTunnelRowType(
+ new String[] {"street", "zipcode"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE,
BasicType.STRING_TYPE});
+
+ SeaTunnelRowType userRowType =
+ new SeaTunnelRowType(
+ new String[] {"name", "address"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE,
addressRowType});
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(new String[] {"user"}, new
SeaTunnelDataType[] {userRowType});
+
+ // Create multi-level nested row data
+ SeaTunnelRow addressRow = new SeaTunnelRow(new Object[] {"123 Main
St", "10001"});
+ SeaTunnelRow userRow = new SeaTunnelRow(new Object[] {"John Doe",
addressRow});
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {userRow});
+
+ sqlEngine.init(
+ "test",
+ null,
+ rowType,
+ "select user.address.street as street, user.name as name from
test");
+
+ SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
outRowType).get(0);
+
+ // Verify multi-level nested field access (testing lines 343-345 in
ZetaSQLFunction)
+ Assertions.assertEquals("123 Main St", outRow.getField(0));
+ Assertions.assertEquals("John Doe", outRow.getField(1));
+ }
+
+ @Test
+ public void testMapFieldNormalAccess() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"user"},
+ new SeaTunnelDataType[] {
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE)
+ });
+
+ // Create map data with actual values (testing normal access scenario)
+ java.util.Map<String, String> userData = new java.util.HashMap<>();
+ userData.put("name", "John Doe");
+ userData.put("email", "[email protected]");
+ SeaTunnelRow inputRow = new SeaTunnelRow(new Object[] {userData});
+
+ sqlEngine.init(
+ "test", null, rowType, "select user.name as name, user.email
as email from test");
+
+ SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+ SeaTunnelRow outRow = sqlEngine.transformBySQL(inputRow,
outRowType).get(0);
+
+ // Verify map field access (testing normal access scenario for lines
343-345)
+ Assertions.assertEquals("John Doe", outRow.getField(0));
+ Assertions.assertEquals("[email protected]", outRow.getField(1));
+ }
+
+ @Test
+ public void testNestedFieldWithNullIntermediateValue() {
+ SQLEngine sqlEngine =
SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+
+ // Create multi-level nested row type structure: user -> address ->
street
+ SeaTunnelRowType addressRowType =
+ new SeaTunnelRowType(
+ new String[] {"street", "zipcode"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE,
BasicType.STRING_TYPE});
+
+ SeaTunnelRowType userRowType =
+ new SeaTunnelRowType(
+ new String[] {"name", "address"},
+ new SeaTunnelDataType[] {BasicType.STRING_TYPE,
addressRowType});
+
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(new String[] {"user"}, new
SeaTunnelDataType[] {userRowType});
+
+ // Test case 1: Normal nested access (user.address.street should
return "beijing")
+ SeaTunnelRow addressRow1 = new SeaTunnelRow(new Object[] {"beijing",
"10001"});
+ SeaTunnelRow userRow1 = new SeaTunnelRow(new Object[] {"zhangsan",
addressRow1});
+ SeaTunnelRow inputRow1 = new SeaTunnelRow(new Object[] {userRow1});
+
+ sqlEngine.init(
+ "test",
+ null,
+ rowType,
+ "select user.address.street as street, user.name as name from
test");
+
+ SeaTunnelRowType outRowType = sqlEngine.typeMapping(null);
+ SeaTunnelRow outRow1 = sqlEngine.transformBySQL(inputRow1,
outRowType).get(0);
+
+ // Verify normal nested field access
+ Assertions.assertEquals("beijing", outRow1.getField(0));
+ Assertions.assertEquals("zhangsan", outRow1.getField(1));
+
+ // Test case 2: Null intermediate value (user.address is null,
user.address.street should
+ // return null)
+ SeaTunnelRow userRow2 = new SeaTunnelRow(new Object[] {"lisi", null});
+ SeaTunnelRow inputRow2 = new SeaTunnelRow(new Object[] {userRow2});
+
+ SeaTunnelRow outRow2 = sqlEngine.transformBySQL(inputRow2,
outRowType).get(0);
+
+ // Verify that when intermediate value is null, the result should be
null
+ Assertions.assertNull(
+ outRow2.getField(0),
+ "When accessing nested field where intermediate value is null,
result should be null");
+ Assertions.assertEquals("lisi", outRow2.getField(1));
+ }
}