This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7532a94626 [Improve][Transform] Support errorHandleWay on jsonpath
transform (#7524)
7532a94626 is described below
commit 7532a946263559a1d4a8bd1ad5906e755839e04a
Author: hailin0 <[email protected]>
AuthorDate: Wed Sep 11 09:25:51 2024 +0800
[Improve][Transform] Support errorHandleWay on jsonpath transform (#7524)
---
docs/en/transform-v2/jsonpath.md | 138 +++++++++++--
docs/zh/transform-v2/jsonpath.md | 135 ++++++++++--
.../flink/execution/TransformExecuteProcessor.java | 28 ++-
.../spark/execution/TransformExecuteProcessor.java | 29 ++-
.../e2e/transform/TestJsonPathTransformIT.java | 7 +
.../json_path_with_error_handle_way.conf | 104 ++++++++++
.../common/AbstractCatalogSupportTransform.java | 28 ++-
.../seatunnel/transform/common/CommonOptions.java | 47 +++++
.../transform/common/ErrorHandleWay.java} | 23 ++-
.../common/MultipleFieldOutputTransform.java | 5 +
.../exception/ErrorDataTransformException.java | 55 +++++
.../seatunnel/transform/jsonpath/ColumnConfig.java | 16 +-
.../transform/jsonpath/JsonPathTransform.java | 23 ++-
.../jsonpath/JsonPathTransformConfig.java | 21 +-
.../jsonpath/JsonPathTransformFactory.java | 6 +-
.../seatunnel/transform/JsonPathTransformTest.java | 227 +++++++++++++++++++++
16 files changed, 833 insertions(+), 59 deletions(-)
diff --git a/docs/en/transform-v2/jsonpath.md b/docs/en/transform-v2/jsonpath.md
index 3baf5853b7..1948f5ca69 100644
--- a/docs/en/transform-v2/jsonpath.md
+++ b/docs/en/transform-v2/jsonpath.md
@@ -8,24 +8,33 @@
## Options
-| name | type | required | default value |
-|---------|-------|----------|---------------|
-| Columns | Array | Yes | |
+| name | type | required | default value |
+|----------------------|-------|----------|---------------|
+| columns | Array | Yes | |
+| row_error_handle_way | Enum | No | FAIL |
### common options [string]
Transform plugin common parameters, please refer to [Transform
Plugin](common-options.md) for details
-### fields[array]
+### row_error_handle_way [Enum]
+
+This option is used to specify the processing method when an error occurs in
the row, the default value is `FAIL`.
+
+- FAIL: When `FAIL` is selected, data format error will block and an exception
will be thrown.
+- SKIP: When `SKIP` is selected, data format error will skip this row data.
+
+### columns[array]
#### option
-| name | type | required | default value |
-|------------|--------|----------|---------------|
-| src_field | String | Yes | |
-| dest_field | String | Yes | |
-| path | String | Yes | |
-| dest_type | String | No | String |
+| name | type | required | default value |
+|-------------------------|--------|----------|---------------|
+| src_field | String | Yes | |
+| dest_field | String | Yes | |
+| path | String | Yes | |
+| dest_type | String | No | String |
+| column_error_handle_way | Enum | No | |
#### src_field
@@ -51,6 +60,14 @@ Support SeatunnelDateType
> Jsonpath
+#### column_error_handle_way [Enum]
+
+This option is used to specify the processing method when an error occurs in
the column.
+
+- FAIL: When `FAIL` is selected, data format error will block and an exception
will be thrown.
+- SKIP: When `SKIP` is selected, data format error will skip this column data.
+- SKIP_ROW: When `SKIP_ROW` is selected, data format error will skip this row
data.
+
## Read Json Example
The data read from source is a table like this json:
@@ -155,23 +172,25 @@ Suppose a column in a row of data is of type SeatunnelRow
and that the name of t
The JsonPath transform converts the values of seatunnel into an array,
-```json
+```hocon
transform {
JsonPath {
source_table_name = "fake"
result_table_name = "fake1"
+
+ row_error_handle_way = FAIL
columns = [
{
"src_field" = "col"
"path" = "$[0]"
"dest_field" = "name"
- "dest_type" = "string"
+ "dest_type" = "string"
},
- {
+ {
"src_field" = "col"
"path" = "$[1]"
"dest_field" = "age"
- "dest_type" = "int"
+ "dest_type" = "int"
}
]
}
@@ -184,6 +203,97 @@ Then the data result table `fake1` will like this
|------|-----|----------|-------|
| a | 18 | ["a",18] | ... |
+
+## Configure error data handle way
+
+You can configure `row_error_handle_way` and `column_error_handle_way` to
handle abnormal data. Both are optional.
+
+`row_error_handle_way` is used to handle all data anomalies in the row data,
while `column_error_handle_way` is used to handle data anomalies in a column.
It has a higher priority than `row_error_handle_way`.
+
+### Skip error data rows
+
+Configure to skip row data with exceptions in any column
+
+```hocon
+transform {
+ JsonPath {
+
+ row_error_handle_way = SKIP
+
+ columns = [
+ {
+ "src_field" = "json_data"
+ "path" = "$.f1"
+ "dest_field" = "json_data_f1"
+ },
+ {
+ "src_field" = "json_data"
+ "path" = "$.f2"
+ "dest_field" = "json_data_f2"
+ }
+ ]
+ }
+}
+```
+
+### Skip error data column
+
+Configure only `json_data_f1` column data exceptions to skip and fill in null
values, other column data exceptions will continue to throw exception interrupt
handlers
+
+
+```hocon
+transform {
+ JsonPath {
+
+ row_error_handle_way = FAIL
+
+ columns = [
+ {
+ "src_field" = "json_data"
+ "path" = "$.f1"
+ "dest_field" = "json_data_f1"
+
+ "column_error_handle_way" = "SKIP"
+ },
+ {
+ "src_field" = "json_data"
+ "path" = "$.f2"
+ "dest_field" = "json_data_f2"
+ }
+ ]
+ }
+}
+```
+
+### Skip the row for specified column error
+
+Configure to skip the row of data only for `json_data_f1` column data
exceptions, and continue to throw exceptions to interrupt the handler for other
column data exceptions
+
+
+```hocon
+transform {
+ JsonPath {
+
+ row_error_handle_way = FAIL
+
+ columns = [
+ {
+ "src_field" = "json_data"
+ "path" = "$.f1"
+ "dest_field" = "json_data_f1"
+
+ "column_error_handle_way" = "SKIP_ROW"
+ },
+ {
+ "src_field" = "json_data"
+ "path" = "$.f2"
+ "dest_field" = "json_data_f2"
+ }
+ ]
+ }
+}
+```
+
## Changelog
* Add JsonPath Transform
diff --git a/docs/zh/transform-v2/jsonpath.md b/docs/zh/transform-v2/jsonpath.md
index 449f0f6a77..4074d78bda 100644
--- a/docs/zh/transform-v2/jsonpath.md
+++ b/docs/zh/transform-v2/jsonpath.md
@@ -8,24 +8,33 @@
## 属性
-| 名称 | 类型 | 是否必须 | 默认值 |
-|---------|-------|------|-----|
-| Columns | Array | Yes | |
+| 名称 | 类型 | 是否必须 | 默认值 |
+|----------------------|-------|------|------|
+| columns | Array | Yes | |
+| row_error_handle_way | Enum | No | FAIL |
### common options [string]
转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情
-### fields[array]
+### row_error_handle_way [Enum]
+
+该选项用于指定当该行发生错误时的处理方式,默认值为 `FAIL`。
+
+- FAIL:选择`FAIL`时,数据格式错误会阻塞并抛出异常。
+- SKIP:选择`SKIP`时,数据格式错误会跳过该行数据。
+
+### columns[array]
#### 属性
-| 名称 | 类型 | 是否必须 | 默认值 |
-|------------|--------|------|--------|
-| src_field | String | Yes | |
-| dest_field | String | Yes | |
-| path | String | Yes | |
-| dest_type | String | No | String |
+| 名称 | 类型 | 是否必须 | 默认值 |
+|-------------------------|--------|------|--------|
+| src_field | String | Yes | |
+| dest_field | String | Yes | |
+| path | String | Yes | |
+| dest_type | String | No | String |
+| column_error_handle_way | Enum | No | |
#### src_field
@@ -51,6 +60,14 @@
> Jsonpath
+#### column_error_handle_way [Enum]
+
+该选项用于指定当列发生错误时的处理方式。
+
+- FAIL:选择`FAIL`时,数据格式错误会阻塞并抛出异常。
+- SKIP:选择`SKIP`时,数据格式错误会跳过此列数据。
+- SKIP_ROW:选择`SKIP_ROW`时,数据格式错误会跳过此行数据。
+
## 读取 JSON 示例
从源读取的数据是像这样的 JSON
@@ -155,23 +172,25 @@ transform {
JsonPath 转换将 seatunnel 的值转换为一个数组。
-```json
+```hocon
transform {
JsonPath {
source_table_name = "fake"
result_table_name = "fake1"
+
+ row_error_handle_way = FAIL
columns = [
{
"src_field" = "col"
"path" = "$[0]"
"dest_field" = "name"
- "dest_type" = "string"
+ "dest_type" = "string"
},
- {
+ {
"src_field" = "col"
"path" = "$[1]"
"dest_field" = "age"
- "dest_type" = "int"
+ "dest_type" = "int"
}
]
}
@@ -184,6 +203,94 @@ transform {
|------|-----|----------|-------|
| a | 18 | ["a",18] | ... |
+## 配置异常数据处理策略
+
+您可以配置 `row_error_handle_way` 与 `column_error_handle_way` 来处理异常数据,两者都是非必填项。
+
+`row_error_handle_way` 配置对行数据内所有数据异常进行处理,`column_error_handle_way`
配置对某列数据异常进行处理,优先级高于 `row_error_handle_way`。
+
+### 跳过异常数据行
+
+配置跳过任意列有异常的整行数据
+
+```hocon
+transform {
+ JsonPath {
+
+ row_error_handle_way = SKIP
+
+ columns = [
+ {
+ "src_field" = "json_data"
+ "path" = "$.f1"
+ "dest_field" = "json_data_f1"
+ },
+ {
+ "src_field" = "json_data"
+ "path" = "$.f2"
+ "dest_field" = "json_data_f2"
+ }
+ ]
+ }
+}
+```
+
+### 跳过部分异常数据列
+
+配置仅对 `json_data_f1` 列数据异常跳过,填充空值,其他列数据异常继续抛出异常中断处理程序
+
+```hocon
+transform {
+ JsonPath {
+
+ row_error_handle_way = FAIL
+
+ columns = [
+ {
+ "src_field" = "json_data"
+ "path" = "$.f1"
+ "dest_field" = "json_data_f1"
+
+ "column_error_handle_way" = "SKIP"
+ },
+ {
+ "src_field" = "json_data"
+ "path" = "$.f2"
+ "dest_field" = "json_data_f2"
+ }
+ ]
+ }
+}
+```
+
+### 部分列异常跳过整行
+
+配置仅对 `json_data_f1` 列数据异常跳过整行数据,其他列数据异常继续抛出异常中断处理程序
+
+```hocon
+transform {
+ JsonPath {
+
+ row_error_handle_way = FAIL
+
+ columns = [
+ {
+ "src_field" = "json_data"
+ "path" = "$.f1"
+ "dest_field" = "json_data_f1"
+
+ "column_error_handle_way" = "SKIP_ROW"
+ },
+ {
+ "src_field" = "json_data"
+ "path" = "$.f2"
+ "dest_field" = "json_data_f2"
+ }
+ ]
+ }
+}
+```
+
## 更新日志
* 添加 JsonPath 转换
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index c92eaf42a9..00a5046e43 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -36,8 +36,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -85,12 +88,22 @@ public class TransformExecuteProcessor
return upstreamDataStreams;
}
DataStreamTableInfo input = upstreamDataStreams.get(0);
+ Map<String, DataStreamTableInfo> outputTables =
+ upstreamDataStreams.stream()
+ .collect(
+ Collectors.toMap(
+ DataStreamTableInfo::getTableName,
+ e -> e,
+ (a, b) -> b,
+ LinkedHashMap::new));
+
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
for (int i = 0; i < plugins.size(); i++) {
try {
Config pluginConfig = pluginConfigs.get(i);
DataStreamTableInfo stream =
- fromSourceTable(pluginConfig,
upstreamDataStreams).orElse(input);
+ fromSourceTable(pluginConfig, new
ArrayList<>(outputTables.values()))
+ .orElse(input);
TableTransformFactory factory = plugins.get(i);
TableTransformFactoryContext context =
new TableTransformFactoryContext(
@@ -103,14 +116,17 @@ public class TransformExecuteProcessor
transform.setJobContext(jobContext);
DataStream<SeaTunnelRow> inputStream =
flinkTransform(transform, stream.getDataStream());
+ String resultTableName =
+ pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+ ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
+ : null;
// TODO transform support multi tables
- upstreamDataStreams.add(
+ outputTables.put(
+ resultTableName,
new DataStreamTableInfo(
inputStream,
Collections.singletonList(transform.getProducedCatalogTable()),
- pluginConfig.hasPath(RESULT_TABLE_NAME.key())
- ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
- : null));
+ resultTableName));
} catch (Exception e) {
throw new TaskExecuteException(
String.format(
@@ -119,7 +135,7 @@ public class TransformExecuteProcessor
e);
}
}
- return upstreamDataStreams;
+ return new ArrayList<>(outputTables.values());
}
protected DataStream<SeaTunnelRow> flinkTransform(
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 43ef1afcee..3736576817 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -50,7 +50,9 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -102,11 +104,23 @@ public class TransformExecuteProcessor
}
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
DatasetTableInfo input = upstreamDataStreams.get(0);
+
+ Map<String, DatasetTableInfo> outputTables =
+ upstreamDataStreams.stream()
+ .collect(
+ Collectors.toMap(
+ DatasetTableInfo::getTableName,
+ e -> e,
+ (a, b) -> b,
+ LinkedHashMap::new));
for (int i = 0; i < plugins.size(); i++) {
try {
Config pluginConfig = pluginConfigs.get(i);
DatasetTableInfo dataset =
- fromSourceTable(pluginConfig, sparkRuntimeEnvironment,
upstreamDataStreams)
+ fromSourceTable(
+ pluginConfig,
+ sparkRuntimeEnvironment,
+ new ArrayList<>(outputTables.values()))
.orElse(input);
TableTransformFactory factory = plugins.get(i);
TableTransformFactoryContext context =
@@ -119,13 +133,16 @@ public class TransformExecuteProcessor
Dataset<Row> inputDataset = sparkTransform(transform, dataset);
registerInputTempView(pluginConfig, inputDataset);
- upstreamDataStreams.add(
+ String resultTableName =
+ pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+ ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
+ : null;
+ outputTables.put(
+ resultTableName,
new DatasetTableInfo(
inputDataset,
Collections.singletonList(transform.getProducedCatalogTable()),
- pluginConfig.hasPath(RESULT_TABLE_NAME.key())
- ?
pluginConfig.getString(RESULT_TABLE_NAME.key())
- : null));
+ resultTableName));
} catch (Exception e) {
throw new TaskExecuteException(
String.format(
@@ -134,7 +151,7 @@ public class TransformExecuteProcessor
e);
}
}
- return upstreamDataStreams;
+ return new ArrayList<>(outputTables.values());
}
private Dataset<Row> sparkTransform(SeaTunnelTransform transform,
DatasetTableInfo tableInfo) {
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
index 9c3c7aa22e..ff9b968cda 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
@@ -44,4 +44,11 @@ public class TestJsonPathTransformIT extends TestSuiteBase {
container.executeJob("/json_path_transform/nested_row_test.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ @TestTemplate
+ public void testErrorHandleWay(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/json_path_transform/json_path_with_error_handle_way.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_with_error_handle_way.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_with_error_handle_way.conf
new file mode 100644
index 0000000000..8c39caf377
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_with_error_handle_way.conf
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ id = "bigint"
+ data = "string"
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "{\"f1\": \"v1\"}"]
+ },
+ {
+ kind = INSERT
+ fields = [2, "{\"f1\": \"v1\", \"f2\": \"v2\"}"]
+ }
+ ]
+ }
+}
+
+transform {
+ JsonPath {
+
+ row_error_handle_way = FAIL
+ columns = [
+ {
+ src_field = "data"
+ path = "$.f1"
+ dest_field = "data_f1"
+ },
+ {
+ src_field = "data"
+ path = "$.f2"
+ dest_field = "data_f2"
+ column_error_handle_way = SKIP
+ }
+ ]
+ }
+}
+
+sink {
+ Assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 2
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 2
+ }
+ ],
+ field_rules = [
+ {
+ field_name = id
+ field_type = "bigint"
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 1
+ },
+ {
+ rule_type = MAX
+ rule_value = 2
+ }
+ ]
+ },
+ {
+ field_name = data_f1
+ field_type = "string"
+ field_value = [{equals_to = "v1"}]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index 632d3af1e4..a0fa464af7 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -22,21 +22,47 @@ import
org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+@Slf4j
public abstract class AbstractCatalogSupportTransform implements
SeaTunnelTransform<SeaTunnelRow> {
+ protected final ErrorHandleWay rowErrorHandleWay;
protected CatalogTable inputCatalogTable;
protected volatile CatalogTable outputCatalogTable;
public AbstractCatalogSupportTransform(@NonNull CatalogTable
inputCatalogTable) {
+ this(inputCatalogTable,
CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
+ }
+
+ public AbstractCatalogSupportTransform(
+ @NonNull CatalogTable inputCatalogTable, ErrorHandleWay
rowErrorHandleWay) {
this.inputCatalogTable = inputCatalogTable;
+ this.rowErrorHandleWay = rowErrorHandleWay;
}
@Override
public SeaTunnelRow map(SeaTunnelRow row) {
- return transformRow(row);
+ try {
+ return transformRow(row);
+ } catch (ErrorDataTransformException e) {
+ if (e.getErrorHandleWay() != null) {
+ ErrorHandleWay errorHandleWay = e.getErrorHandleWay();
+ if (errorHandleWay.allowSkipThisRow()) {
+ log.debug("Skip row due to error", e);
+ return null;
+ }
+ throw e;
+ }
+ if (rowErrorHandleWay.allowSkip()) {
+ log.debug("Skip row due to error", e);
+ return null;
+ }
+ throw e;
+ }
}
/**
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java
new file mode 100644
index 0000000000..eb4b0f4a71
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.common;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Arrays;
+
+public final class CommonOptions {
+
+ public static Option<ErrorHandleWay> ROW_ERROR_HANDLE_WAY_OPTION =
+ Options.key("row_error_handle_way")
+ .singleChoice(
+ ErrorHandleWay.class,
+ Arrays.asList(ErrorHandleWay.FAIL,
ErrorHandleWay.SKIP))
+ .defaultValue(ErrorHandleWay.FAIL)
+ .withDescription(
+ "The processing method of data format error. The
default value is fail, and the optional value is (fail, skip). "
+ + "When fail is selected, data format
error will block and an exception will be thrown. "
+ + "When skip is selected, data format
error will skip this line data.");
+
+ public static Option<ErrorHandleWay> COLUMN_ERROR_HANDLE_WAY_OPTION =
+ Options.key("column_error_handle_way")
+ .enumType(ErrorHandleWay.class)
+ .noDefaultValue()
+ .withDescription(
+ "The processing method of data format error. "
+ + "When fail is selected, data format
error will block and an exception will be thrown. "
+ + "When skip is selected, data format
error will skip this column data."
+ + "When skip_row is selected, data format
error will skip this line data.");
+}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
similarity index 64%
rename from
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
rename to
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
index 2f1314cace..0f87d6d45f 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
@@ -14,17 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.transform;
-import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory;
+package org.apache.seatunnel.transform.common;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public enum ErrorHandleWay {
+ // Fail the transformation when error occurs
+ FAIL,
+ // Skip the data when error occurs
+ SKIP,
+ // Skip the row when error occurs
+ SKIP_ROW;
-public class JsonPathTransformFactoryTest {
- @Test
- public void testOptionRule() {
- JsonPathTransformFactory jsonPathTransformFactory = new
JsonPathTransformFactory();
- Assertions.assertNotNull(jsonPathTransformFactory.optionRule());
+ public boolean allowSkipThisRow() {
+ return this == SKIP_ROW;
+ }
+
+ public boolean allowSkip() {
+ return this == SKIP;
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index bfe6336182..51e1172412 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -45,6 +45,11 @@ public abstract class MultipleFieldOutputTransform extends
AbstractCatalogSuppor
super(inputCatalogTable);
}
+ public MultipleFieldOutputTransform(
+ @NonNull CatalogTable inputCatalogTable, ErrorHandleWay
errorHandleWay) {
+ super(inputCatalogTable, errorHandleWay);
+ }
+
@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
Object[] fieldValues = getOutputFieldValues(new
SeaTunnelRowAccessor(inputRow));
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/ErrorDataTransformException.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/ErrorDataTransformException.java
new file mode 100644
index 0000000000..58b7cd5ca1
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/ErrorDataTransformException.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+
+import lombok.Getter;
+
+import java.util.Map;
+
+public class ErrorDataTransformException extends SeaTunnelRuntimeException {
+ @Getter private final ErrorHandleWay errorHandleWay;
+
+ public ErrorDataTransformException(SeaTunnelErrorCode seaTunnelErrorCode,
String errorMessage) {
+ this(null, seaTunnelErrorCode, errorMessage);
+ }
+
+ public ErrorDataTransformException(
+ ErrorHandleWay errorHandleWay,
+ SeaTunnelErrorCode seaTunnelErrorCode,
+ String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ this.errorHandleWay = errorHandleWay;
+ }
+
+ public ErrorDataTransformException(
+ SeaTunnelErrorCode seaTunnelErrorCode, Map<String, String> params)
{
+ this(null, seaTunnelErrorCode, params);
+ }
+
+ public ErrorDataTransformException(
+ ErrorHandleWay errorHandleWay,
+ SeaTunnelErrorCode seaTunnelErrorCode,
+ Map<String, String> params) {
+ super(seaTunnelErrorCode, params);
+ this.errorHandleWay = errorHandleWay;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
index 1025a12651..4d56be1582 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
@@ -17,9 +17,13 @@
package org.apache.seatunnel.transform.jsonpath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+
+import lombok.ToString;
import java.io.Serializable;
+@ToString
public class ColumnConfig implements Serializable {
private final String path;
@@ -28,13 +32,19 @@ public class ColumnConfig implements Serializable {
private final String destField;
private final SeaTunnelDataType<?> destType;
+ private final ErrorHandleWay errorHandleWay;
public ColumnConfig(
- String path, String srcField, String destField,
SeaTunnelDataType<?> destType) {
+ String path,
+ String srcField,
+ String destField,
+ SeaTunnelDataType<?> destType,
+ ErrorHandleWay errorHandleWay) {
this.path = path;
this.srcField = srcField;
this.destField = destField;
this.destType = destType;
+ this.errorHandleWay = errorHandleWay;
}
public String getPath() {
@@ -52,4 +62,8 @@ public class ColumnConfig implements Serializable {
public SeaTunnelDataType<?> getDestType() {
return destType;
}
+
+ public ErrorHandleWay errorHandleWay() {
+ return errorHandleWay;
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 34ebee0ba5..3e14e8488d 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -29,8 +29,8 @@ import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
import org.apache.seatunnel.transform.exception.TransformCommonError;
-import org.apache.seatunnel.transform.exception.TransformException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.JsonPathException;
@@ -59,7 +59,7 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
private int[] srcFieldIndexArr;
public JsonPathTransform(JsonPathTransformConfig config, CatalogTable
catalogTable) {
- super(catalogTable);
+ super(catalogTable, config.getErrorHandleWay());
this.config = config;
this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
init();
@@ -122,11 +122,12 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
Object[] fieldValues = new Object[size];
for (int i = 0; i < size; i++) {
int pos = this.srcFieldIndexArr[i];
+ ColumnConfig fieldConfig = configs.get(i);
fieldValues[i] =
doTransform(
seaTunnelRowType.getFieldType(pos),
inputRow.getField(pos),
- configs.get(i),
+ fieldConfig,
converters[i]);
}
return fieldValues;
@@ -168,7 +169,21 @@ public class JsonPathTransform extends
MultipleFieldOutputTransform {
JsonNode jsonNode = JsonUtils.toJsonNode(result);
return converter.convert(jsonNode, null);
} catch (JsonPathException e) {
- throw new TransformException(JSON_PATH_COMPILE_ERROR,
e.getMessage());
+ if (columnConfig.errorHandleWay() != null
+ && columnConfig.errorHandleWay().allowSkip()) {
+ log.debug(
+ "JsonPath transform error, ignore error, config: {},
value: {}",
+ columnConfig,
+ jsonString,
+ e);
+ return null;
+ }
+ throw new ErrorDataTransformException(
+ columnConfig.errorHandleWay(),
+ JSON_PATH_COMPILE_ERROR,
+ String.format(
+ "JsonPath transform error, config: %s, value: %s,
error: %s",
+ columnConfig, jsonString, e.getMessage()));
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
index 7c1036cbb3..51a0d6ac34 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
@@ -23,14 +23,19 @@ import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.common.CommonOptions;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.commons.lang3.StringUtils;
+import lombok.Getter;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.COLUMNS_MUST_NOT_EMPTY;
import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.DEST_FIELD_MUST_NOT_EMPTY;
@@ -70,13 +75,16 @@ public class JsonPathTransformConfig implements
Serializable {
.withDescription("columns");
private final List<ColumnConfig> columnConfigs;
+ @Getter private final ErrorHandleWay errorHandleWay;
public List<ColumnConfig> getColumnConfigs() {
return columnConfigs;
}
- public JsonPathTransformConfig(List<ColumnConfig> columnConfigs) {
+ public JsonPathTransformConfig(
+ List<ColumnConfig> columnConfigs, ErrorHandleWay errorHandleWay) {
this.columnConfigs = columnConfigs;
+ this.errorHandleWay = errorHandleWay;
}
public static JsonPathTransformConfig of(ReadonlyConfig config) {
@@ -84,6 +92,7 @@ public class JsonPathTransformConfig implements Serializable {
throw new TransformException(
COLUMNS_MUST_NOT_EMPTY,
COLUMNS_MUST_NOT_EMPTY.getErrorMessage());
}
+ ErrorHandleWay rowErrorHandleWay =
config.get(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION);
List<Map<String, String>> columns = config.get(COLUMNS);
List<ColumnConfig> configs = new ArrayList<>(columns.size());
for (Map<String, String> map : columns) {
@@ -92,12 +101,18 @@ public class JsonPathTransformConfig implements
Serializable {
String srcField = map.get(SRC_FIELD.key());
String destField = map.get(DEST_FIELD.key());
String type = map.getOrDefault(DEST_TYPE.key(),
DEST_TYPE.defaultValue());
+ ErrorHandleWay columnErrorHandleWay =
+
Optional.ofNullable(map.get(CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key()))
+ .map(ErrorHandleWay::valueOf)
+ .orElse(null);
+
SeaTunnelDataType<?> dataType =
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(srcField, type);
- ColumnConfig columnConfig = new ColumnConfig(path, srcField,
destField, dataType);
+ ColumnConfig columnConfig =
+ new ColumnConfig(path, srcField, destField, dataType,
columnErrorHandleWay);
configs.add(columnConfig);
}
- return new JsonPathTransformConfig(configs);
+ return new JsonPathTransformConfig(configs, rowErrorHandleWay);
}
private static void checkColumnConfig(Map<String, String> map) {
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
index 796f8dbe6d..def17f2564 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.CommonOptions;
import com.google.auto.service.AutoService;
@@ -36,7 +37,10 @@ public class JsonPathTransformFactory implements
TableTransformFactory {
@Override
public OptionRule optionRule() {
- return
OptionRule.builder().required(JsonPathTransformConfig.COLUMNS).build();
+ return OptionRule.builder()
+ .required(JsonPathTransformConfig.COLUMNS)
+ .optional(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION)
+ .build();
}
@Override
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
new file mode 100644
index 0000000000..51c0c0ac30
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.common.CommonOptions;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransform;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonPathTransformTest {
+
+ @Test
+ public void testJsonPath() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
"data",
+ JsonPathTransformConfig.PATH.key(), "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
"f1")));
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ CatalogTable table =
+ CatalogTableUtil.getCatalogTable(
+ "test",
+ new SeaTunnelRowType(
+ new String[] {"data"},
+ new SeaTunnelDataType[]
{BasicType.STRING_TYPE}));
+ JsonPathTransform transform =
+ new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+
+ CatalogTable outputTable = transform.getProducedCatalogTable();
+ SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[]
{"{\"f1\": 1}"}));
+ Assertions.assertEquals(
+ "1",
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1")));
+ }
+
+ @Test
+ public void testErrorHandleWay() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
"data",
+ JsonPathTransformConfig.PATH.key(), "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
"f1")));
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+ CatalogTable table =
+ CatalogTableUtil.getCatalogTable(
+ "test",
+ new SeaTunnelRowType(
+ new String[] {"data"},
+ new SeaTunnelDataType[]
{BasicType.STRING_TYPE}));
+ JsonPathTransform transform =
+ new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ CatalogTable outputTable = transform.getProducedCatalogTable();
+ final JsonPathTransform finalTransform = transform;
+ Assertions.assertThrows(
+ ErrorDataTransformException.class,
+ () -> finalTransform.map(new SeaTunnelRow(new Object[]
{"{\"f2\": 1}"})));
+
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
+ "data",
+ JsonPathTransformConfig.PATH.key(),
+ "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
+ "f1",
+
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+ ErrorHandleWay.FAIL.name())));
+ config = ReadonlyConfig.fromMap(configMap);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ outputTable = transform.getProducedCatalogTable();
+ JsonPathTransform finalTransform1 = transform;
+ Assertions.assertThrows(
+ ErrorDataTransformException.class,
+ () -> finalTransform1.map(new SeaTunnelRow(new Object[]
{"{\"f2\": 1}"})));
+
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
+ "data",
+ JsonPathTransformConfig.PATH.key(),
+ "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
+ "f1",
+
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+ ErrorHandleWay.SKIP.name())));
+ config = ReadonlyConfig.fromMap(configMap);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ outputTable = transform.getProducedCatalogTable();
+ SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[]
{"{\"f2\": 1}"}));
+ Assertions.assertNotNull(outputRow);
+
Assertions.assertNull(outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1")));
+
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
+ "data",
+ JsonPathTransformConfig.PATH.key(),
+ "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
+ "f1",
+
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+ ErrorHandleWay.SKIP_ROW.name())));
+ config = ReadonlyConfig.fromMap(configMap);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ outputTable = transform.getProducedCatalogTable();
+ outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
+ Assertions.assertNull(outputRow);
+
+ configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.SKIP.name());
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
"data",
+ JsonPathTransformConfig.PATH.key(), "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
"f1")));
+ config = ReadonlyConfig.fromMap(configMap);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ outputTable = transform.getProducedCatalogTable();
+ outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
+ Assertions.assertNull(outputRow);
+
+ configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.SKIP.name());
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
+ "data",
+ JsonPathTransformConfig.PATH.key(),
+ "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
+ "f1",
+
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+ ErrorHandleWay.FAIL.name())));
+ config = ReadonlyConfig.fromMap(configMap);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ outputTable = transform.getProducedCatalogTable();
+ try {
+ outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
+ Assertions.fail("should throw exception");
+ } catch (Exception e) {
+ // ignore
+ }
+
+ configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.FAIL.name());
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
+ "data",
+ JsonPathTransformConfig.PATH.key(),
+ "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
+ "f1",
+
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+ ErrorHandleWay.SKIP.name())));
+ config = ReadonlyConfig.fromMap(configMap);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ outputTable = transform.getProducedCatalogTable();
+ outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
+ Assertions.assertNotNull(outputRow);
+
Assertions.assertNull(outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1")));
+
+ configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(),
ErrorHandleWay.FAIL.name());
+ configMap.put(
+ JsonPathTransformConfig.COLUMNS.key(),
+ Arrays.asList(
+ ImmutableMap.of(
+ JsonPathTransformConfig.SRC_FIELD.key(),
+ "data",
+ JsonPathTransformConfig.PATH.key(),
+ "$.f1",
+ JsonPathTransformConfig.DEST_FIELD.key(),
+ "f1",
+
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+ ErrorHandleWay.SKIP_ROW.name())));
+ config = ReadonlyConfig.fromMap(configMap);
+ transform = new JsonPathTransform(JsonPathTransformConfig.of(config),
table);
+ outputTable = transform.getProducedCatalogTable();
+ outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\":
1}"}));
+ Assertions.assertNull(outputRow);
+ }
+}