This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a6ae81bf89 [Improve][Transform] Auto mark column length with jsonpath
transform (#7636)
a6ae81bf89 is described below
commit a6ae81bf89d6184775e373e3803c52aa4f0245e1
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 19 21:34:16 2024 +0800
[Improve][Transform] Auto mark column length with jsonpath transform (#7636)
---
.../seatunnel/transform/jsonpath/ColumnConfig.java | 10 ++--
.../transform/jsonpath/JsonPathTransform.java | 25 ++-------
.../jsonpath/JsonPathTransformConfig.java | 23 ++++++--
.../jsonpath/JsonPathTransformFactory.java | 3 +-
.../seatunnel/transform/JsonPathTransformTest.java | 62 ++++++++++++++++++----
5 files changed, 86 insertions(+), 37 deletions(-)
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
index 4d56be1582..c17b2236f1 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
@@ -16,9 +16,11 @@
*/
package org.apache.seatunnel.transform.jsonpath;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.transform.common.ErrorHandleWay;
+import lombok.Getter;
import lombok.ToString;
import java.io.Serializable;
@@ -31,19 +33,19 @@ public class ColumnConfig implements Serializable {
private final String destField;
- private final SeaTunnelDataType<?> destType;
+ @Getter private final Column destColumn;
private final ErrorHandleWay errorHandleWay;
public ColumnConfig(
String path,
String srcField,
String destField,
- SeaTunnelDataType<?> destType,
+ Column destColumn,
ErrorHandleWay errorHandleWay) {
this.path = path;
this.srcField = srcField;
this.destField = destField;
- this.destType = destType;
+ this.destColumn = destColumn;
this.errorHandleWay = errorHandleWay;
}
@@ -60,7 +62,7 @@ public class ColumnConfig implements Serializable {
}
public SeaTunnelDataType<?> getDestType() {
- return destType;
+ return destColumn.getDataType();
}
public ErrorHandleWay errorHandleWay() {
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 7978aa2260..82c22bcb9a 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -20,7 +20,6 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
-import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
@@ -54,7 +53,7 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
private final SeaTunnelRowType seaTunnelRowType;
private JsonToRowConverters.JsonToObjectConverter[] converters;
- private SeaTunnelRowType outputSeaTunnelRowType;
+ private Column[] outputColumns;
private int[] srcFieldIndexArr;
@@ -87,17 +86,10 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
}
private void initOutputSeaTunnelRowType() {
-
- SeaTunnelDataType<?>[] dataTypes =
+ this.outputColumns =
this.config.getColumnConfigs().stream()
- .map(ColumnConfig::getDestType)
- .toArray(SeaTunnelDataType<?>[]::new);
- this.outputSeaTunnelRowType =
- new SeaTunnelRowType(
- this.config.getColumnConfigs().stream()
- .map(ColumnConfig::getDestField)
- .toArray(String[]::new),
- dataTypes);
+ .map(ColumnConfig::getDestColumn)
+ .toArray(Column[]::new);
}
private void initSrcFieldIndexArr() {
@@ -189,13 +181,6 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
@Override
protected Column[] getOutputColumns() {
- int len = this.outputSeaTunnelRowType.getTotalFields();
- Column[] columns = new Column[len];
- for (int i = 0; i < len; i++) {
- String fieldName = this.outputSeaTunnelRowType.getFieldName(i);
- SeaTunnelDataType<?> fieldType =
this.outputSeaTunnelRowType.getFieldType(i);
- columns[i] = PhysicalColumn.of(fieldName, fieldType, 200, true,
"", "");
- }
- return columns;
+ return outputColumns;
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
index 51a0d6ac34..5cf158071f 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
@@ -21,10 +21,14 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.transform.common.CommonOptions;
import org.apache.seatunnel.transform.common.ErrorHandleWay;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.commons.lang3.StringUtils;
@@ -87,7 +91,7 @@ public class JsonPathTransformConfig implements Serializable {
this.errorHandleWay = errorHandleWay;
}
- public static JsonPathTransformConfig of(ReadonlyConfig config) {
+ public static JsonPathTransformConfig of(ReadonlyConfig config,
CatalogTable table) {
if (!config.toConfig().hasPath(COLUMNS.key())) {
throw new TransformException(
COLUMNS_MUST_NOT_EMPTY,
COLUMNS_MUST_NOT_EMPTY.getErrorMessage());
@@ -106,10 +110,23 @@ public class JsonPathTransformConfig implements
Serializable {
.map(ErrorHandleWay::valueOf)
.orElse(null);
- SeaTunnelDataType<?> dataType =
+ SeaTunnelDataType<?> srcFieldDataType =
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(srcField, type);
+ if (!table.getTableSchema().contains(srcField)) {
+ throw
TransformCommonError.cannotFindInputFieldError("JsonPath", srcField);
+ }
+ Column srcFieldColumn = table.getTableSchema().getColumn(srcField);
+ Column destFieldColumn =
+ PhysicalColumn.of(
+ destField,
+ srcFieldDataType,
+ srcFieldColumn.getColumnLength(),
+ true,
+ null,
+ null);
ColumnConfig columnConfig =
- new ColumnConfig(path, srcField, destField, dataType,
columnErrorHandleWay);
+ new ColumnConfig(
+ path, srcField, destField, destFieldColumn,
columnErrorHandleWay);
configs.add(columnConfig);
}
return new JsonPathTransformConfig(configs, rowErrorHandleWay);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
index def17f2564..923229042d 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
@@ -47,7 +47,8 @@ public class JsonPathTransformFactory implements
TableTransformFactory {
public TableTransform createTransform(TableTransformFactoryContext
context) {
CatalogTable catalogTable = context.getCatalogTables().get(0);
ReadonlyConfig options = context.getOptions();
- JsonPathTransformConfig jsonPathTransformConfig =
JsonPathTransformConfig.of(options);
+ JsonPathTransformConfig jsonPathTransformConfig =
+ JsonPathTransformConfig.of(options, catalogTable);
return () -> new JsonPathTransform(jsonPathTransformConfig,
catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
index 51c0c0ac30..61464c8135 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
@@ -21,6 +21,10 @@ import
org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -34,6 +38,7 @@ import
org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -58,7 +63,7 @@ public class JsonPathTransformTest {
new String[] {"data"},
new SeaTunnelDataType[]
{BasicType.STRING_TYPE}));
JsonPathTransform transform =
- new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
CatalogTable outputTable = transform.getProducedCatalogTable();
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[]
{"{\"f1\": 1}"}));
@@ -84,7 +89,7 @@ public class JsonPathTransformTest {
new String[] {"data"},
new SeaTunnelDataType[]
{BasicType.STRING_TYPE}));
JsonPathTransform transform =
- new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
CatalogTable outputTable = transform.getProducedCatalogTable();
final JsonPathTransform finalTransform = transform;
Assertions.assertThrows(
@@ -104,7 +109,7 @@ public class JsonPathTransformTest {
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.FAIL.name())));
config = ReadonlyConfig.fromMap(configMap);
- transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
outputTable = transform.getProducedCatalogTable();
JsonPathTransform finalTransform1 = transform;
Assertions.assertThrows(
@@ -124,7 +129,7 @@ public class JsonPathTransformTest {
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.SKIP.name())));
config = ReadonlyConfig.fromMap(configMap);
- transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
outputTable = transform.getProducedCatalogTable();
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[]
{"{\"f2\": 1}"}));
Assertions.assertNotNull(outputRow);
@@ -143,7 +148,7 @@ public class JsonPathTransformTest {
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.SKIP_ROW.name())));
config = ReadonlyConfig.fromMap(configMap);
- transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
outputTable = transform.getProducedCatalogTable();
outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
Assertions.assertNull(outputRow);
@@ -157,7 +162,7 @@ public class JsonPathTransformTest {
JsonPathTransformConfig.PATH.key(), "$.f1",
JsonPathTransformConfig.DEST_FIELD.key(),
"f1")));
config = ReadonlyConfig.fromMap(configMap);
- transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
outputTable = transform.getProducedCatalogTable();
outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
Assertions.assertNull(outputRow);
@@ -176,7 +181,7 @@ public class JsonPathTransformTest {
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.FAIL.name())));
config = ReadonlyConfig.fromMap(configMap);
- transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
outputTable = transform.getProducedCatalogTable();
try {
outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
@@ -199,7 +204,7 @@ public class JsonPathTransformTest {
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.SKIP.name())));
config = ReadonlyConfig.fromMap(configMap);
- transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
outputTable = transform.getProducedCatalogTable();
outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
Assertions.assertNotNull(outputRow);
@@ -219,9 +224,48 @@ public class JsonPathTransformTest {
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.SKIP_ROW.name())));
config = ReadonlyConfig.fromMap(configMap);
- transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
outputTable = transform.getProducedCatalogTable();
outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
Assertions.assertNull(outputRow);
}
+
+ @Test
+ public void testOutputColumn() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
"data",
+ JsonPathTransformConfig.PATH.key(), "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
"f1")));
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ CatalogTable table =
+ CatalogTable.of(
+ TableIdentifier.of("default", "default", "default",
"default"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "data",
+ BasicType.STRING_TYPE,
+ 1024,
+ true,
+ null,
+ null))
+ .build(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ null);
+ JsonPathTransform transform =
+ new JsonPathTransform(JsonPathTransformConfig.of(config,
table), table);
+ CatalogTable outputCatalogTable = transform.getProducedCatalogTable();
+ Column f1 = outputCatalogTable.getTableSchema().getColumn("f1");
+ Assertions.assertEquals(BasicType.STRING_TYPE, f1.getDataType());
+ Assertions.assertEquals(1024, f1.getColumnLength());
+
+ SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[]
{"{\"f1\": 1}"}));
+ Assertions.assertNotNull(outputRow);
+ }
}