Hisoka-X commented on code in PR #7524:
URL: https://github.com/apache/seatunnel/pull/7524#discussion_r1746400764
##########
docs/en/transform-v2/jsonpath.md:
##########
@@ -51,6 +60,14 @@ Support SeatunnelDateType
> Jsonpath
+#### column_error_handle_way [Enum]
Review Comment:
Add a `column_error_handle_way` config in example? And please share more
about priority between `column_error_handle_way` and `row_error_handle_way`.
##########
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java:
##########
@@ -22,21 +22,47 @@
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public abstract class AbstractCatalogSupportTransform implements
SeaTunnelTransform<SeaTunnelRow> {
+ protected final ErrorHandleWay rowErrorHandleWay;
protected CatalogTable inputCatalogTable;
protected volatile CatalogTable outputCatalogTable;
public AbstractCatalogSupportTransform(@NonNull CatalogTable
inputCatalogTable) {
+ this(inputCatalogTable,
CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
+ }
+
+ public AbstractCatalogSupportTransform(
+ @NonNull CatalogTable inputCatalogTable, ErrorHandleWay
rowErrorHandleWay) {
this.inputCatalogTable = inputCatalogTable;
+ this.rowErrorHandleWay = rowErrorHandleWay;
}
@Override
public SeaTunnelRow map(SeaTunnelRow row) {
- return transformRow(row);
+ try {
+ return transformRow(row);
+ } catch (ErrorDataTransformException e) {
+ if (e.getErrorHandleWay() != null) {
+ ErrorHandleWay errorHandleWay = e.getErrorHandleWay();
+ if (errorHandleWay.allowSkip() ||
errorHandleWay.allowSkipThisRow()) {
Review Comment:
```suggestion
if (errorHandleWay.allowSkipThisRow()) {
```
The `errorHandleWay.allowSkip()` always be false.
##########
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java:
##########
@@ -16,15 +16,217 @@
*/
package org.apache.seatunnel.transform;
-import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory;
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.common.CommonOptions;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransform;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
public class JsonPathTransformFactoryTest {
+
+ @Test
+ public void testJsonPath() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
"data",
+ JsonPathTransformConfig.PATH.key(), "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
"f1")));
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ CatalogTable table =
+ CatalogTableUtil.getCatalogTable(
+ "test",
+ new SeaTunnelRowType(
+ new String[] {"data"},
+ new SeaTunnelDataType[]
{BasicType.STRING_TYPE}));
+ JsonPathTransform transform =
+ new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+
+ CatalogTable outputTable = transform.getProducedCatalogTable();
+ SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[]
{"{\"f1\": 1}"}));
+ Assertions.assertEquals(
+ "1",
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1")));
+ }
+
@Test
- public void testOptionRule() {
- JsonPathTransformFactory jsonPathTransformFactory = new
JsonPathTransformFactory();
- Assertions.assertNotNull(jsonPathTransformFactory.optionRule());
+ public void testErrorHandleWay() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
"data",
+ JsonPathTransformConfig.PATH.key(), "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
"f1")));
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ CatalogTable table =
+ CatalogTableUtil.getCatalogTable(
+ "test",
+ new SeaTunnelRowType(
+ new String[] {"data"},
+ new SeaTunnelDataType[]
{BasicType.STRING_TYPE}));
+ JsonPathTransform transform =
+ new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+
+ CatalogTable outputTable = transform.getProducedCatalogTable();
+ SeaTunnelRow outputRow;
+ try {
+ outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
+ Assertions.fail("should throw exception");
+ } catch (Exception e) {
+ // ignore
+ }
Review Comment:
The `Assertions.fail("should throw exception");` will be catch too?
I think you can use `Assertions.assertThrows()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]