This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e305f198bb [Feature][transform-v2] jsonpath support map array type
(#8577)
e305f198bb is described below
commit e305f198bb49f95e1431ef5d7372470859133bf9
Author: CosmosNi <[email protected]>
AuthorDate: Fri Feb 7 13:58:27 2025 +0800
[Feature][transform-v2] jsonpath support map array type (#8577)
Co-authored-by: nijiahui <[email protected]>
---
docs/en/transform-v2/jsonpath.md | 11 ++-
docs/zh/transform-v2/jsonpath.md | 13 ++-
.../catalog/SeaTunnelDataTypeConvertorUtil.java | 3 +
.../e2e/transform/TestJsonPathTransformIT.java | 7 ++
.../json_path_transform/json_path_array_map.conf | 99 ++++++++++++++++++++++
.../spark/serialization/SeaTunnelRowConverter.java | 8 ++
6 files changed, 136 insertions(+), 5 deletions(-)
diff --git a/docs/en/transform-v2/jsonpath.md b/docs/en/transform-v2/jsonpath.md
index f787487069..3d9595372b 100644
--- a/docs/en/transform-v2/jsonpath.md
+++ b/docs/en/transform-v2/jsonpath.md
@@ -83,7 +83,8 @@ The data read from source is a table like this json:
"c_decimal": 10.55,
"c_date": "2023-10-29",
"c_datetime": "16:12:43.459",
- "c_array":["item1", "item2", "item3"]
+ "c_array":["item1", "item2", "item3"],
+ "c_map_array":
[{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"},{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"}]
}
}
```
@@ -143,11 +144,17 @@ transform {
"dest_field" = "c1_datetime"
"dest_type" = "time"
},
- {
+ {
"src_field" = "data"
"path" = "$.data.c_array"
"dest_field" = "c1_array"
"dest_type" = "array<string>"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_map_array"
+ "dest_field" = "c1_map_array"
+ "dest_type" = "array<map<string, string>>"
}
]
}
diff --git a/docs/zh/transform-v2/jsonpath.md b/docs/zh/transform-v2/jsonpath.md
index a83767e0c1..1b2dff2e8b 100644
--- a/docs/zh/transform-v2/jsonpath.md
+++ b/docs/zh/transform-v2/jsonpath.md
@@ -83,7 +83,8 @@
"c_decimal": 10.55,
"c_date": "2023-10-29",
"c_datetime": "16:12:43.459",
- "c_array":["item1", "item2", "item3"]
+ "c_array":["item1", "item2", "item3"],
+ "c_map_array":
[{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"},{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"}]
}
}
```
@@ -143,11 +144,17 @@ transform {
"dest_field" = "c1_datetime"
"dest_type" = "time"
},
- {
+ {
"src_field" = "data"
"path" = "$.data.c_array"
"dest_field" = "c1_array"
- "dest_type" = "array<string>"
+ "dest_type" = "array<string>"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_map_array"
+ "dest_field" = "c1_map_array"
+ "dest_type" = "array<map<string, string>>"
}
]
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index c5109f2e14..acd7a158d4 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -230,6 +230,9 @@ public class SeaTunnelDataTypeConvertorUtil {
return ArrayType.FLOAT_ARRAY_TYPE;
case DOUBLE:
return ArrayType.DOUBLE_ARRAY_TYPE;
+ case MAP:
+ MapType<?, ?> mapType = (MapType<?, ?>) dataType;
+ return new ArrayType<>(MapType.class, mapType);
default:
throw CommonError.unsupportedDataType("SeaTunnel",
genericType, field);
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
index 3bc4b0cc70..7e2dd35bd6 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
@@ -59,4 +59,11 @@ public class TestJsonPathTransformIT extends TestSuiteBase {
container.executeJob("/json_path_transform/json_path_with_error_handle_way.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ @TestTemplate
+ public void testArrayType(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/json_path_transform/json_path_array_map.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_array_map.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_array_map.conf
new file mode 100644
index 0000000000..8b22a28535
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_array_map.conf
@@ -0,0 +1,99 @@
+#
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements. See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #
+ ######
+ ###### This config file is a demonstration of streaming processing in
seatunnel config
+ ######
+ env {
+ job.mode = "BATCH"
+ }
+
+ source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 100
+ string.fake.mode = "template"
+
string.template=["{"data":{"c_map_string_array":[{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"},{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"}],"c_map_int_array":[{"c_int_1":1,"c_int_2":2,"c_int_3":3},{"c_int_1":1,"c_int_2":2,"c_int_3":3}]}}"]
+ schema = {
+ fields {
+ data = "string"
+ }
+ }
+ }
+ }
+
+ transform {
+ JsonPath {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ columns = [
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_map_string_array"
+ "dest_field" = "c_map_string_array_1"
+ "dest_type" = "array<map<string, string>>"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_map_int_array"
+ "dest_field" = "c_map_int_array_1"
+ "dest_type" = "array<map<string, int>>"
+ }
+ ]
+ }
+ Sql {
+ plugin_input = "fake1"
+ plugin_output = "fake2"
+ query = "select c_map_string_array_1,c_map_int_array_1 from dual"
+ }
+ }
+
+ sink {
+ Assert {
+ plugin_input = "fake2"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_map_string_array_1
+ field_type = "array<map<string, string>>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = [{c_string_1=c_string_1, c_string_2=c_string_2,
c_string_3=c_string_3}, {c_string_1=c_string_1, c_string_2=c_string_2,
c_string_3=c_string_3}]
+ }
+ ]
+ },
+ {
+ field_name = c_map_int_array_1
+ field_type = "array<map<string, int>>"
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = [{c_int_1=1, c_int_2=2, c_int_3=3}, {c_int_1=1,
c_int_2=2, c_int_3=3}]
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
index 4ea0da4821..1d03b6448d 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.translation.serialization.RowConverter;
import org.apache.seatunnel.translation.spark.utils.OffsetDateTimeUtils;
@@ -170,6 +171,13 @@ public class SeaTunnelRowConverter extends
RowConverter<GenericRow> {
return new WrappedArray.ofRef<>(new Object[0]);
}
int num = arrayData.length;
+ if (SqlType.MAP.equals(arrayType.getElementType().getSqlType())) {
+ Object[] arrayMapData = new Object[num];
+ for (int i = 0; i < num; i++) {
+ arrayMapData[i] = convert(arrayData[i],
arrayType.getElementType());
+ }
+ return new WrappedArray.ofRef<>(arrayMapData);
+ }
for (int i = 0; i < num; i++) {
arrayData[i] = convert(arrayData[i], arrayType.getElementType());
}