This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a6ae81bf89 [Improve][Transform] Auto mark column length with jsonpath 
transform (#7636)
a6ae81bf89 is described below

commit a6ae81bf89d6184775e373e3803c52aa4f0245e1
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 19 21:34:16 2024 +0800

    [Improve][Transform] Auto mark column length with jsonpath transform (#7636)
---
 .../seatunnel/transform/jsonpath/ColumnConfig.java | 10 ++--
 .../transform/jsonpath/JsonPathTransform.java      | 25 ++-------
 .../jsonpath/JsonPathTransformConfig.java          | 23 ++++++--
 .../jsonpath/JsonPathTransformFactory.java         |  3 +-
 .../seatunnel/transform/JsonPathTransformTest.java | 62 ++++++++++++++++++----
 5 files changed, 86 insertions(+), 37 deletions(-)

diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
index 4d56be1582..c17b2236f1 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
@@ -16,9 +16,11 @@
  */
 package org.apache.seatunnel.transform.jsonpath;
 
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.transform.common.ErrorHandleWay;
 
+import lombok.Getter;
 import lombok.ToString;
 
 import java.io.Serializable;
@@ -31,19 +33,19 @@ public class ColumnConfig implements Serializable {
 
     private final String destField;
 
-    private final SeaTunnelDataType<?> destType;
+    @Getter private final Column destColumn;
     private final ErrorHandleWay errorHandleWay;
 
     public ColumnConfig(
             String path,
             String srcField,
             String destField,
-            SeaTunnelDataType<?> destType,
+            Column destColumn,
             ErrorHandleWay errorHandleWay) {
         this.path = path;
         this.srcField = srcField;
         this.destField = destField;
-        this.destType = destType;
+        this.destColumn = destColumn;
         this.errorHandleWay = errorHandleWay;
     }
 
@@ -60,7 +62,7 @@ public class ColumnConfig implements Serializable {
     }
 
     public SeaTunnelDataType<?> getDestType() {
-        return destType;
+        return destColumn.getDataType();
     }
 
     public ErrorHandleWay errorHandleWay() {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 7978aa2260..82c22bcb9a 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -20,7 +20,6 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
@@ -54,7 +53,7 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
     private final SeaTunnelRowType seaTunnelRowType;
 
     private JsonToRowConverters.JsonToObjectConverter[] converters;
-    private SeaTunnelRowType outputSeaTunnelRowType;
+    private Column[] outputColumns;
 
     private int[] srcFieldIndexArr;
 
@@ -87,17 +86,10 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
     }
 
     private void initOutputSeaTunnelRowType() {
-
-        SeaTunnelDataType<?>[] dataTypes =
+        this.outputColumns =
                 this.config.getColumnConfigs().stream()
-                        .map(ColumnConfig::getDestType)
-                        .toArray(SeaTunnelDataType<?>[]::new);
-        this.outputSeaTunnelRowType =
-                new SeaTunnelRowType(
-                        this.config.getColumnConfigs().stream()
-                                .map(ColumnConfig::getDestField)
-                                .toArray(String[]::new),
-                        dataTypes);
+                        .map(ColumnConfig::getDestColumn)
+                        .toArray(Column[]::new);
     }
 
     private void initSrcFieldIndexArr() {
@@ -189,13 +181,6 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
 
     @Override
     protected Column[] getOutputColumns() {
-        int len = this.outputSeaTunnelRowType.getTotalFields();
-        Column[] columns = new Column[len];
-        for (int i = 0; i < len; i++) {
-            String fieldName = this.outputSeaTunnelRowType.getFieldName(i);
-            SeaTunnelDataType<?> fieldType = 
this.outputSeaTunnelRowType.getFieldType(i);
-            columns[i] = PhysicalColumn.of(fieldName, fieldType, 200, true, 
"", "");
-        }
-        return columns;
+        return outputColumns;
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
index 51a0d6ac34..5cf158071f 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
@@ -21,10 +21,14 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.transform.common.CommonOptions;
 import org.apache.seatunnel.transform.common.ErrorHandleWay;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
 import org.apache.seatunnel.transform.exception.TransformException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -87,7 +91,7 @@ public class JsonPathTransformConfig implements Serializable {
         this.errorHandleWay = errorHandleWay;
     }
 
-    public static JsonPathTransformConfig of(ReadonlyConfig config) {
+    public static JsonPathTransformConfig of(ReadonlyConfig config, 
CatalogTable table) {
         if (!config.toConfig().hasPath(COLUMNS.key())) {
             throw new TransformException(
                     COLUMNS_MUST_NOT_EMPTY, 
COLUMNS_MUST_NOT_EMPTY.getErrorMessage());
@@ -106,10 +110,23 @@ public class JsonPathTransformConfig implements 
Serializable {
                             .map(ErrorHandleWay::valueOf)
                             .orElse(null);
 
-            SeaTunnelDataType<?> dataType =
+            SeaTunnelDataType<?> srcFieldDataType =
                     
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(srcField, type);
+            if (!table.getTableSchema().contains(srcField)) {
+                throw 
TransformCommonError.cannotFindInputFieldError("JsonPath", srcField);
+            }
+            Column srcFieldColumn = table.getTableSchema().getColumn(srcField);
+            Column destFieldColumn =
+                    PhysicalColumn.of(
+                            destField,
+                            srcFieldDataType,
+                            srcFieldColumn.getColumnLength(),
+                            true,
+                            null,
+                            null);
             ColumnConfig columnConfig =
-                    new ColumnConfig(path, srcField, destField, dataType, 
columnErrorHandleWay);
+                    new ColumnConfig(
+                            path, srcField, destField, destFieldColumn, 
columnErrorHandleWay);
             configs.add(columnConfig);
         }
         return new JsonPathTransformConfig(configs, rowErrorHandleWay);
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
index def17f2564..923229042d 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
@@ -47,7 +47,8 @@ public class JsonPathTransformFactory implements 
TableTransformFactory {
     public TableTransform createTransform(TableTransformFactoryContext 
context) {
         CatalogTable catalogTable = context.getCatalogTables().get(0);
         ReadonlyConfig options = context.getOptions();
-        JsonPathTransformConfig jsonPathTransformConfig = 
JsonPathTransformConfig.of(options);
+        JsonPathTransformConfig jsonPathTransformConfig =
+                JsonPathTransformConfig.of(options, catalogTable);
         return () -> new JsonPathTransform(jsonPathTransformConfig, 
catalogTable);
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
index 51c0c0ac30..61464c8135 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
@@ -21,6 +21,10 @@ import 
org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -34,6 +38,7 @@ import 
org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -58,7 +63,7 @@ public class JsonPathTransformTest {
                                 new String[] {"data"},
                                 new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}));
         JsonPathTransform transform =
-                new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+                new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
 
         CatalogTable outputTable = transform.getProducedCatalogTable();
         SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] 
{"{\"f1\": 1}"}));
@@ -84,7 +89,7 @@ public class JsonPathTransformTest {
                                 new String[] {"data"},
                                 new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}));
         JsonPathTransform transform =
-                new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+                new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         CatalogTable outputTable = transform.getProducedCatalogTable();
         final JsonPathTransform finalTransform = transform;
         Assertions.assertThrows(
@@ -104,7 +109,7 @@ public class JsonPathTransformTest {
                                 
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
                                 ErrorHandleWay.FAIL.name())));
         config = ReadonlyConfig.fromMap(configMap);
-        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         outputTable = transform.getProducedCatalogTable();
         JsonPathTransform finalTransform1 = transform;
         Assertions.assertThrows(
@@ -124,7 +129,7 @@ public class JsonPathTransformTest {
                                 
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
                                 ErrorHandleWay.SKIP.name())));
         config = ReadonlyConfig.fromMap(configMap);
-        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         outputTable = transform.getProducedCatalogTable();
         SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] 
{"{\"f2\": 1}"}));
         Assertions.assertNotNull(outputRow);
@@ -143,7 +148,7 @@ public class JsonPathTransformTest {
                                 
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
                                 ErrorHandleWay.SKIP_ROW.name())));
         config = ReadonlyConfig.fromMap(configMap);
-        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         outputTable = transform.getProducedCatalogTable();
         outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
         Assertions.assertNull(outputRow);
@@ -157,7 +162,7 @@ public class JsonPathTransformTest {
                                 JsonPathTransformConfig.PATH.key(), "$.f1",
                                 JsonPathTransformConfig.DEST_FIELD.key(), 
"f1")));
         config = ReadonlyConfig.fromMap(configMap);
-        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         outputTable = transform.getProducedCatalogTable();
         outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
         Assertions.assertNull(outputRow);
@@ -176,7 +181,7 @@ public class JsonPathTransformTest {
                                 
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
                                 ErrorHandleWay.FAIL.name())));
         config = ReadonlyConfig.fromMap(configMap);
-        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         outputTable = transform.getProducedCatalogTable();
         try {
             outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
@@ -199,7 +204,7 @@ public class JsonPathTransformTest {
                                 
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
                                 ErrorHandleWay.SKIP.name())));
         config = ReadonlyConfig.fromMap(configMap);
-        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         outputTable = transform.getProducedCatalogTable();
         outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
         Assertions.assertNotNull(outputRow);
@@ -219,9 +224,48 @@ public class JsonPathTransformTest {
                                 
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
                                 ErrorHandleWay.SKIP_ROW.name())));
         config = ReadonlyConfig.fromMap(configMap);
-        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
         outputTable = transform.getProducedCatalogTable();
         outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
         Assertions.assertNull(outputRow);
     }
+
+    @Test
+    public void testOutputColumn() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(), 
"data",
+                                JsonPathTransformConfig.PATH.key(), "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(), 
"f1")));
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+        CatalogTable table =
+                CatalogTable.of(
+                        TableIdentifier.of("default", "default", "default", 
"default"),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "data",
+                                                BasicType.STRING_TYPE,
+                                                1024,
+                                                true,
+                                                null,
+                                                null))
+                                .build(),
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        null);
+        JsonPathTransform transform =
+                new JsonPathTransform(JsonPathTransformConfig.of(config, 
table), table);
+        CatalogTable outputCatalogTable = transform.getProducedCatalogTable();
+        Column f1 = outputCatalogTable.getTableSchema().getColumn("f1");
+        Assertions.assertEquals(BasicType.STRING_TYPE, f1.getDataType());
+        Assertions.assertEquals(1024, f1.getColumnLength());
+
+        SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] 
{"{\"f1\": 1}"}));
+        Assertions.assertNotNull(outputRow);
+    }
 }

Reply via email to