This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e305f198bb [Feature][transform-v2] jsonpath support map array type 
(#8577)
e305f198bb is described below

commit e305f198bb49f95e1431ef5d7372470859133bf9
Author: CosmosNi <[email protected]>
AuthorDate: Fri Feb 7 13:58:27 2025 +0800

    [Feature][transform-v2] jsonpath support map array type (#8577)
    
    Co-authored-by: nijiahui <[email protected]>
---
 docs/en/transform-v2/jsonpath.md                   | 11 ++-
 docs/zh/transform-v2/jsonpath.md                   | 13 ++-
 .../catalog/SeaTunnelDataTypeConvertorUtil.java    |  3 +
 .../e2e/transform/TestJsonPathTransformIT.java     |  7 ++
 .../json_path_transform/json_path_array_map.conf   | 99 ++++++++++++++++++++++
 .../spark/serialization/SeaTunnelRowConverter.java |  8 ++
 6 files changed, 136 insertions(+), 5 deletions(-)

diff --git a/docs/en/transform-v2/jsonpath.md b/docs/en/transform-v2/jsonpath.md
index f787487069..3d9595372b 100644
--- a/docs/en/transform-v2/jsonpath.md
+++ b/docs/en/transform-v2/jsonpath.md
@@ -83,7 +83,8 @@ The data read from source is a table like this json:
     "c_decimal": 10.55,
     "c_date": "2023-10-29",
     "c_datetime": "16:12:43.459",
-    "c_array":["item1", "item2", "item3"]
+    "c_array":["item1", "item2", "item3"],
+    "c_map_array": 
[{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"},{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"}]
   }
 }
 ```
@@ -143,11 +144,17 @@ transform {
          "dest_field" = "c1_datetime"
          "dest_type" = "time"
       },
-                       {
+      {
          "src_field" = "data"
          "path" = "$.data.c_array"
          "dest_field" = "c1_array"
          "dest_type" = "array<string>"        
+      },
+      {
+        "src_field" = "data"
+        "path" = "$.data.c_map_array"
+        "dest_field" = "c1_map_array"
+        "dest_type" = "array<map<string, string>>"
       }
     ]
   }
diff --git a/docs/zh/transform-v2/jsonpath.md b/docs/zh/transform-v2/jsonpath.md
index a83767e0c1..1b2dff2e8b 100644
--- a/docs/zh/transform-v2/jsonpath.md
+++ b/docs/zh/transform-v2/jsonpath.md
@@ -83,7 +83,8 @@
     "c_decimal": 10.55,
     "c_date": "2023-10-29",
     "c_datetime": "16:12:43.459",
-    "c_array":["item1", "item2", "item3"]
+    "c_array":["item1", "item2", "item3"],
+    "c_map_array": 
[{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"},{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"}]
   }
 }
 ```
@@ -143,11 +144,17 @@ transform {
          "dest_field" = "c1_datetime"
          "dest_type" = "time"
       },
-                       {
+         {
          "src_field" = "data"
          "path" = "$.data.c_array"
          "dest_field" = "c1_array"
-         "dest_type" = "array<string>"        
+         "dest_type" = "array<string>"
+      },
+      {
+        "src_field" = "data"
+        "path" = "$.data.c_map_array"
+        "dest_field" = "c1_map_array"
+        "dest_type" = "array<map<string, string>>"
       }
     ]
   }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
index c5109f2e14..acd7a158d4 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/SeaTunnelDataTypeConvertorUtil.java
@@ -230,6 +230,9 @@ public class SeaTunnelDataTypeConvertorUtil {
                 return ArrayType.FLOAT_ARRAY_TYPE;
             case DOUBLE:
                 return ArrayType.DOUBLE_ARRAY_TYPE;
+            case MAP:
+                MapType<?, ?> mapType = (MapType<?, ?>) dataType;
+                return new ArrayType<>(MapType.class, mapType);
             default:
                 throw CommonError.unsupportedDataType("SeaTunnel", 
genericType, field);
         }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
index 3bc4b0cc70..7e2dd35bd6 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
@@ -59,4 +59,11 @@ public class TestJsonPathTransformIT extends TestSuiteBase {
                 
container.executeJob("/json_path_transform/json_path_with_error_handle_way.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    @TestTemplate
+    public void testArrayType(TestContainer container) throws Exception {
+        Container.ExecResult execResult =
+                
container.executeJob("/json_path_transform/json_path_array_map.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_array_map.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_array_map.conf
new file mode 100644
index 0000000000..8b22a28535
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_array_map.conf
@@ -0,0 +1,99 @@
+#
+ # Licensed to the Apache Software Foundation (ASF) under one or more
+ # contributor license agreements.  See the NOTICE file distributed with
+ # this work for additional information regarding copyright ownership.
+ # The ASF licenses this file to You under the Apache License, Version 2.0
+ # (the "License"); you may not use this file except in compliance with
+ # the License.  You may obtain a copy of the License at
+ #
+ #    http://www.apache.org/licenses/LICENSE-2.0
+ #
+ # Unless required by applicable law or agreed to in writing, software
+ # distributed under the License is distributed on an "AS IS" BASIS,
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ # See the License for the specific language governing permissions and
+ # limitations under the License.
+ #
+ ######
+ ###### This config file is a demonstration of streaming processing in 
seatunnel config
+ ######
+ env {
+   job.mode = "BATCH"
+ }
+
+ source {
+   FakeSource {
+     plugin_output = "fake"
+     row.num = 100
+     string.fake.mode = "template"
+     
string.template=["{"data":{"c_map_string_array":[{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"},{"c_string_1":"c_string_1","c_string_2":"c_string_2","c_string_3":"c_string_3"}],"c_map_int_array":[{"c_int_1":1,"c_int_2":2,"c_int_3":3},{"c_int_1":1,"c_int_2":2,"c_int_3":3}]}}"]
+     schema = {
+       fields {
+         data = "string"
+       }
+     }
+   }
+ }
+
+ transform {
+   JsonPath {
+     plugin_input = "fake"
+     plugin_output = "fake1"
+     columns = [
+      {
+         "src_field" = "data"
+         "path" = "$.data.c_map_string_array"
+         "dest_field" = "c_map_string_array_1"
+         "dest_type" = "array<map<string, string>>"
+      },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_map_int_array"
+        "dest_field" = "c_map_int_array_1"
+        "dest_type" = "array<map<string, int>>"
+     }
+     ]
+   }
+     Sql {
+     plugin_input = "fake1"
+     plugin_output = "fake2"
+       query = "select c_map_string_array_1,c_map_int_array_1 from dual"
+     }
+ }
+
+ sink {
+   Assert {
+     plugin_input = "fake2"
+     rules =
+       {
+         row_rules = [
+           {
+             rule_type = MIN_ROW
+             rule_value = 100
+           }
+         ],
+         field_rules = [
+           {
+             field_name = c_map_string_array_1
+             field_type = "array<map<string, string>>"
+             field_value = [
+               {
+                 rule_type = NOT_NULL
+                 equals_to = [{c_string_1=c_string_1, c_string_2=c_string_2, 
c_string_3=c_string_3}, {c_string_1=c_string_1, c_string_2=c_string_2, 
c_string_3=c_string_3}]
+               }
+             ]
+           },
+           {
+             field_name = c_map_int_array_1
+             field_type = "array<map<string, int>>"
+             field_value = [
+               {
+                 rule_type = NOT_NULL
+                 equals_to = [{c_int_1=1, c_int_2=2, c_int_3=3}, {c_int_1=1, 
c_int_2=2, c_int_3=3}]
+               }
+             ]
+           }
+         ]
+       }
+   }
+ }
\ No newline at end of file
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
index 4ea0da4821..1d03b6448d 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.translation.serialization.RowConverter;
 import org.apache.seatunnel.translation.spark.utils.OffsetDateTimeUtils;
 
@@ -170,6 +171,13 @@ public class SeaTunnelRowConverter extends 
RowConverter<GenericRow> {
             return new WrappedArray.ofRef<>(new Object[0]);
         }
         int num = arrayData.length;
+        if (SqlType.MAP.equals(arrayType.getElementType().getSqlType())) {
+            Object[] arrayMapData = new Object[num];
+            for (int i = 0; i < num; i++) {
+                arrayMapData[i] = convert(arrayData[i], 
arrayType.getElementType());
+            }
+            return new WrappedArray.ofRef<>(arrayMapData);
+        }
         for (int i = 0; i < num; i++) {
             arrayData[i] = convert(arrayData[i], arrayType.getElementType());
         }

Reply via email to