This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7532a94626 [Improve][Transform] Support errorHandleWay on jsonpath 
transform (#7524)
7532a94626 is described below

commit 7532a946263559a1d4a8bd1ad5906e755839e04a
Author: hailin0 <[email protected]>
AuthorDate: Wed Sep 11 09:25:51 2024 +0800

    [Improve][Transform] Support errorHandleWay on jsonpath transform (#7524)
---
 docs/en/transform-v2/jsonpath.md                   | 138 +++++++++++--
 docs/zh/transform-v2/jsonpath.md                   | 135 ++++++++++--
 .../flink/execution/TransformExecuteProcessor.java |  28 ++-
 .../spark/execution/TransformExecuteProcessor.java |  29 ++-
 .../e2e/transform/TestJsonPathTransformIT.java     |   7 +
 .../json_path_with_error_handle_way.conf           | 104 ++++++++++
 .../common/AbstractCatalogSupportTransform.java    |  28 ++-
 .../seatunnel/transform/common/CommonOptions.java  |  47 +++++
 .../transform/common/ErrorHandleWay.java}          |  23 ++-
 .../common/MultipleFieldOutputTransform.java       |   5 +
 .../exception/ErrorDataTransformException.java     |  55 +++++
 .../seatunnel/transform/jsonpath/ColumnConfig.java |  16 +-
 .../transform/jsonpath/JsonPathTransform.java      |  23 ++-
 .../jsonpath/JsonPathTransformConfig.java          |  21 +-
 .../jsonpath/JsonPathTransformFactory.java         |   6 +-
 .../seatunnel/transform/JsonPathTransformTest.java | 227 +++++++++++++++++++++
 16 files changed, 833 insertions(+), 59 deletions(-)

diff --git a/docs/en/transform-v2/jsonpath.md b/docs/en/transform-v2/jsonpath.md
index 3baf5853b7..1948f5ca69 100644
--- a/docs/en/transform-v2/jsonpath.md
+++ b/docs/en/transform-v2/jsonpath.md
@@ -8,24 +8,33 @@
 
 ## Options
 
-|  name   | type  | required | default value |
-|---------|-------|----------|---------------|
-| Columns | Array | Yes      |               |
+| name                 | type  | required | default value |
+|----------------------|-------|----------|---------------|
+| columns              | Array | Yes      |               |
+| row_error_handle_way | Enum  | No       | FAIL          |
 
 ### common options [string]
 
 Transform plugin common parameters, please refer to [Transform 
Plugin](common-options.md) for details
 
-### fields[array]
+### row_error_handle_way [Enum]
+
+This option is used to specify the processing method when an error occurs in 
the row, the default value is `FAIL`.
+
+- FAIL: When `FAIL` is selected, data format error will block and an exception 
will be thrown.
+- SKIP: When `SKIP` is selected, data format error will skip this row data.
+
+### columns[array]
 
 #### option
 
-|    name    |  type  | required | default value |
-|------------|--------|----------|---------------|
-| src_field  | String | Yes      |               |
-| dest_field | String | Yes      |               |
-| path       | String | Yes      |               |
-| dest_type  | String | No       | String        |
+| name                    | type   | required | default value |
+|-------------------------|--------|----------|---------------|
+| src_field               | String | Yes      |               |
+| dest_field              | String | Yes      |               |
+| path                    | String | Yes      |               |
+| dest_type               | String | No       | String        |
+| column_error_handle_way | Enum   | No       |               |
 
 #### src_field
 
@@ -51,6 +60,14 @@ Support SeatunnelDateType
 
 > Jsonpath
 
+#### column_error_handle_way [Enum]
+
+This option is used to specify the processing method when an error occurs in 
the column.
+
+- FAIL: When `FAIL` is selected, data format error will block and an exception 
will be thrown.
+- SKIP: When `SKIP` is selected, data format error will skip this column data.
+- SKIP_ROW: When `SKIP_ROW` is selected, data format error will skip this row 
data.
+
 ## Read Json Example
 
 The data read from source is a table like this json:
@@ -155,23 +172,25 @@ Suppose a column in a row of data is of type SeatunnelRow 
and that the name of t
 
 The JsonPath transform converts the values of seatunnel into an array,
 
-```json
+```hocon
 transform {
   JsonPath {
     source_table_name = "fake"
     result_table_name = "fake1"
+  
+    row_error_handle_way = FAIL
     columns = [
      {
         "src_field" = "col"
         "path" = "$[0]"
         "dest_field" = "name"
-                       "dest_type" = "string"
+        "dest_type" = "string"
      },
-               {
+     {
         "src_field" = "col"
         "path" = "$[1]"
         "dest_field" = "age"
-                       "dest_type" = "int"
+        "dest_type" = "int"
      }
     ]
   }
@@ -184,6 +203,97 @@ Then the data result table `fake1` will like this
 |------|-----|----------|-------|
 | a    | 18  | ["a",18] | ...   |
 
+
+## Configure error data handle way
+
+You can configure `row_error_handle_way` and `column_error_handle_way` to 
handle abnormal data. Both are optional.
+
+`row_error_handle_way` is used to handle all data anomalies in the row data, 
while `column_error_handle_way` is used to handle data anomalies in a column. 
It has a higher priority than `row_error_handle_way`.
+
+### Skip error data rows
+
+Configure to skip row data with exceptions in any column
+
+```hocon
+transform {
+  JsonPath {
+
+    row_error_handle_way = SKIP
+    
+    columns = [
+     {
+        "src_field" = "json_data"
+        "path" = "$.f1"
+        "dest_field" = "json_data_f1"
+     },
+     {
+        "src_field" = "json_data"
+        "path" = "$.f2"
+        "dest_field" = "json_data_f2"
+     }
+    ]
+  }
+}
+```
+
+### Skip error data column
+
+Configure only `json_data_f1` column data exceptions to skip and fill in null 
values, other column data exceptions will continue to throw exception interrupt 
handlers
+
+
+```hocon
+transform {
+  JsonPath {
+
+    row_error_handle_way = FAIL
+    
+    columns = [
+     {
+        "src_field" = "json_data"
+        "path" = "$.f1"
+        "dest_field" = "json_data_f1"
+        
+        "column_error_handle_way" = "SKIP"
+     },
+     {
+        "src_field" = "json_data"
+        "path" = "$.f2"
+        "dest_field" = "json_data_f2"
+     }
+    ]
+  }
+}
+```
+
+### Skip the row for specified column error
+
+Configure to skip the row of data only for `json_data_f1` column data 
exceptions, and continue to throw exceptions to interrupt the handler for other 
column data exceptions
+
+
+```hocon
+transform {
+  JsonPath {
+
+    row_error_handle_way = FAIL
+    
+    columns = [
+     {
+        "src_field" = "json_data"
+        "path" = "$.f1"
+        "dest_field" = "json_data_f1"
+        
+        "column_error_handle_way" = "SKIP_ROW"
+     },
+     {
+        "src_field" = "json_data"
+        "path" = "$.f2"
+        "dest_field" = "json_data_f2"
+     }
+    ]
+  }
+}
+```
+
 ## Changelog
 
 * Add JsonPath Transform
diff --git a/docs/zh/transform-v2/jsonpath.md b/docs/zh/transform-v2/jsonpath.md
index 449f0f6a77..4074d78bda 100644
--- a/docs/zh/transform-v2/jsonpath.md
+++ b/docs/zh/transform-v2/jsonpath.md
@@ -8,24 +8,33 @@
 
 ## 属性
 
-|   名称    |  类型   | 是否必须 | 默认值 |
-|---------|-------|------|-----|
-| Columns | Array | Yes  |     |
+| 名称                   | 类型    | 是否必须 | 默认值  |
+|----------------------|-------|------|------|    
+| columns              | Array | Yes  |      | 
+| row_error_handle_way | Enum  | No   | FAIL |
 
 ### common options [string]
 
 转换插件的常见参数, 请参考  [Transform Plugin](common-options.md) 了解详情
 
-### fields[array]
+### row_error_handle_way [Enum]
+
+该选项用于指定当该行发生错误时的处理方式,默认值为 `FAIL`。
+
+- FAIL:选择`FAIL`时,数据格式错误会阻塞并抛出异常。
+- SKIP:选择`SKIP`时,数据格式错误会跳过该行数据。
+
+### columns[array]
 
 #### 属性
 
-|     名称     |   类型   | 是否必须 |  默认值   |
-|------------|--------|------|--------|
-| src_field  | String | Yes  |        |
-| dest_field | String | Yes  |        |
-| path       | String | Yes  |        |
-| dest_type  | String | No   | String |
+| 名称                      | 类型     | 是否必须 | 默认值    |
+|-------------------------|--------|------|--------|
+| src_field               | String | Yes  |        |
+| dest_field              | String | Yes  |        |
+| path                    | String | Yes  |        |
+| dest_type               | String | No   | String |
+| column_error_handle_way | Enum   | No   |        |
 
 #### src_field
 
@@ -51,6 +60,14 @@
 
 > Jsonpath
 
+#### column_error_handle_way [Enum]
+
+该选项用于指定当列发生错误时的处理方式。
+
+- FAIL:选择`FAIL`时,数据格式错误会阻塞并抛出异常。
+- SKIP:选择`SKIP`时,数据格式错误会跳过此列数据。
+- SKIP_ROW:选择`SKIP_ROW`时,数据格式错误会跳过此行数据。
+
 ## 读取 JSON 示例
 
 从源读取的数据是像这样的 JSON
@@ -155,23 +172,25 @@ transform {
 
 JsonPath 转换将 seatunnel 的值转换为一个数组。
 
-```json
+```hocon
 transform {
   JsonPath {
     source_table_name = "fake"
     result_table_name = "fake1"
+
+    row_error_handle_way = FAIL
     columns = [
      {
         "src_field" = "col"
         "path" = "$[0]"
         "dest_field" = "name"
-                       "dest_type" = "string"
+        "dest_type" = "string"
      },
-               {
+     {
         "src_field" = "col"
         "path" = "$[1]"
         "dest_field" = "age"
-                       "dest_type" = "int"
+        "dest_type" = "int"
      }
     ]
   }
@@ -184,6 +203,94 @@ transform {
 |------|-----|----------|-------|
 | a    | 18  | ["a",18] | ...   |
 
+## 配置异常数据处理策略
+
+您可以配置 `row_error_handle_way` 与 `column_error_handle_way` 来处理异常数据,两者都是非必填项。
+
+`row_error_handle_way` 配置对行数据内所有数据异常进行处理,`column_error_handle_way` 
配置对某列数据异常进行处理,优先级高于 `row_error_handle_way`。
+
+### 跳过异常数据行
+
+配置跳过任意列有异常的整行数据
+
+```hocon
+transform {
+  JsonPath {
+
+    row_error_handle_way = SKIP
+    
+    columns = [
+     {
+        "src_field" = "json_data"
+        "path" = "$.f1"
+        "dest_field" = "json_data_f1"
+     },
+     {
+        "src_field" = "json_data"
+        "path" = "$.f2"
+        "dest_field" = "json_data_f2"
+     }
+    ]
+  }
+}
+```
+
+### 跳过部分异常数据列
+
+配置仅对 `json_data_f1` 列数据异常跳过,填充空值,其他列数据异常继续抛出异常中断处理程序
+
+```hocon
+transform {
+  JsonPath {
+
+    row_error_handle_way = FAIL
+    
+    columns = [
+     {
+        "src_field" = "json_data"
+        "path" = "$.f1"
+        "dest_field" = "json_data_f1"
+        
+        "column_error_handle_way" = "SKIP"
+     },
+     {
+        "src_field" = "json_data"
+        "path" = "$.f2"
+        "dest_field" = "json_data_f2"
+     }
+    ]
+  }
+}
+```
+
+### 部分列异常跳过整行
+
+配置仅对 `json_data_f1` 列数据异常跳过整行数据,其他列数据异常继续抛出异常中断处理程序
+
+```hocon
+transform {
+  JsonPath {
+
+    row_error_handle_way = FAIL
+    
+    columns = [
+     {
+        "src_field" = "json_data"
+        "path" = "$.f1"
+        "dest_field" = "json_data_f1"
+        
+        "column_error_handle_way" = "SKIP_ROW"
+     },
+     {
+        "src_field" = "json_data"
+        "path" = "$.f2"
+        "dest_field" = "json_data_f2"
+     }
+    ]
+  }
+}
+```
+
 ## 更新日志
 
 * 添加 JsonPath 转换
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index c92eaf42a9..00a5046e43 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -36,8 +36,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.operators.StreamMap;
 
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -85,12 +88,22 @@ public class TransformExecuteProcessor
             return upstreamDataStreams;
         }
         DataStreamTableInfo input = upstreamDataStreams.get(0);
+        Map<String, DataStreamTableInfo> outputTables =
+                upstreamDataStreams.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        DataStreamTableInfo::getTableName,
+                                        e -> e,
+                                        (a, b) -> b,
+                                        LinkedHashMap::new));
+
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         for (int i = 0; i < plugins.size(); i++) {
             try {
                 Config pluginConfig = pluginConfigs.get(i);
                 DataStreamTableInfo stream =
-                        fromSourceTable(pluginConfig, 
upstreamDataStreams).orElse(input);
+                        fromSourceTable(pluginConfig, new 
ArrayList<>(outputTables.values()))
+                                .orElse(input);
                 TableTransformFactory factory = plugins.get(i);
                 TableTransformFactoryContext context =
                         new TableTransformFactoryContext(
@@ -103,14 +116,17 @@ public class TransformExecuteProcessor
                 transform.setJobContext(jobContext);
                 DataStream<SeaTunnelRow> inputStream =
                         flinkTransform(transform, stream.getDataStream());
+                String resultTableName =
+                        pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+                                ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
+                                : null;
                 // TODO transform support multi tables
-                upstreamDataStreams.add(
+                outputTables.put(
+                        resultTableName,
                         new DataStreamTableInfo(
                                 inputStream,
                                 
Collections.singletonList(transform.getProducedCatalogTable()),
-                                pluginConfig.hasPath(RESULT_TABLE_NAME.key())
-                                        ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
-                                        : null));
+                                resultTableName));
             } catch (Exception e) {
                 throw new TaskExecuteException(
                         String.format(
@@ -119,7 +135,7 @@ public class TransformExecuteProcessor
                         e);
             }
         }
-        return upstreamDataStreams;
+        return new ArrayList<>(outputTables.values());
     }
 
     protected DataStream<SeaTunnelRow> flinkTransform(
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index 43ef1afcee..3736576817 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -50,7 +50,9 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -102,11 +104,23 @@ public class TransformExecuteProcessor
         }
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         DatasetTableInfo input = upstreamDataStreams.get(0);
+
+        Map<String, DatasetTableInfo> outputTables =
+                upstreamDataStreams.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        DatasetTableInfo::getTableName,
+                                        e -> e,
+                                        (a, b) -> b,
+                                        LinkedHashMap::new));
         for (int i = 0; i < plugins.size(); i++) {
             try {
                 Config pluginConfig = pluginConfigs.get(i);
                 DatasetTableInfo dataset =
-                        fromSourceTable(pluginConfig, sparkRuntimeEnvironment, 
upstreamDataStreams)
+                        fromSourceTable(
+                                        pluginConfig,
+                                        sparkRuntimeEnvironment,
+                                        new ArrayList<>(outputTables.values()))
                                 .orElse(input);
                 TableTransformFactory factory = plugins.get(i);
                 TableTransformFactoryContext context =
@@ -119,13 +133,16 @@ public class TransformExecuteProcessor
 
                 Dataset<Row> inputDataset = sparkTransform(transform, dataset);
                 registerInputTempView(pluginConfig, inputDataset);
-                upstreamDataStreams.add(
+                String resultTableName =
+                        pluginConfig.hasPath(RESULT_TABLE_NAME.key())
+                                ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
+                                : null;
+                outputTables.put(
+                        resultTableName,
                         new DatasetTableInfo(
                                 inputDataset,
                                 
Collections.singletonList(transform.getProducedCatalogTable()),
-                                pluginConfig.hasPath(RESULT_TABLE_NAME.key())
-                                        ? 
pluginConfig.getString(RESULT_TABLE_NAME.key())
-                                        : null));
+                                resultTableName));
             } catch (Exception e) {
                 throw new TaskExecuteException(
                         String.format(
@@ -134,7 +151,7 @@ public class TransformExecuteProcessor
                         e);
             }
         }
-        return upstreamDataStreams;
+        return new ArrayList<>(outputTables.values());
     }
 
     private Dataset<Row> sparkTransform(SeaTunnelTransform transform, 
DatasetTableInfo tableInfo) {
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
index 9c3c7aa22e..ff9b968cda 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
@@ -44,4 +44,11 @@ public class TestJsonPathTransformIT extends TestSuiteBase {
                 
container.executeJob("/json_path_transform/nested_row_test.conf");
         Assertions.assertEquals(0, execResult.getExitCode());
     }
+
+    @TestTemplate
+    public void testErrorHandleWay(TestContainer container) throws Exception {
+        Container.ExecResult execResult =
+                
container.executeJob("/json_path_transform/json_path_with_error_handle_way.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_with_error_handle_way.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_with_error_handle_way.conf
new file mode 100644
index 0000000000..8c39caf377
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_with_error_handle_way.conf
@@ -0,0 +1,104 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        id = "bigint"
+        data = "string"
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "{\"f1\": \"v1\"}"]
+      },
+      {
+        kind = INSERT
+        fields = [2, "{\"f1\": \"v1\", \"f2\": \"v2\"}"]
+      }
+    ]
+  }
+}
+
+transform {
+  JsonPath {
+
+    row_error_handle_way = FAIL
+    columns = [
+        {
+            src_field = "data"
+            path = "$.f1"
+            dest_field = "data_f1"
+        },
+        {
+            src_field = "data"
+            path = "$.f2"
+            dest_field = "data_f2"
+            column_error_handle_way = SKIP
+        }
+    ]
+  }
+}
+
+sink {
+  Assert {
+      rules =
+        {
+          row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 2
+            },
+            {
+              rule_type = MIN_ROW
+              rule_value = 2
+            }
+          ],
+          field_rules = [
+              {
+                field_name = id
+                field_type = "bigint"
+                field_value = [
+                  {
+                    rule_type = MIN
+                    rule_value = 1
+                  },
+                  {
+                    rule_type = MAX
+                    rule_value = 2
+                  }
+                ]
+              },
+              {
+                field_name = data_f1
+                field_type = "string"
+                field_value = [{equals_to = "v1"}]
+              }
+          ]
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
index 632d3af1e4..a0fa464af7 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java
@@ -22,21 +22,47 @@ import 
org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public abstract class AbstractCatalogSupportTransform implements 
SeaTunnelTransform<SeaTunnelRow> {
+    protected final ErrorHandleWay rowErrorHandleWay;
     protected CatalogTable inputCatalogTable;
 
     protected volatile CatalogTable outputCatalogTable;
 
     public AbstractCatalogSupportTransform(@NonNull CatalogTable 
inputCatalogTable) {
+        this(inputCatalogTable, 
CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.defaultValue());
+    }
+
+    public AbstractCatalogSupportTransform(
+            @NonNull CatalogTable inputCatalogTable, ErrorHandleWay 
rowErrorHandleWay) {
         this.inputCatalogTable = inputCatalogTable;
+        this.rowErrorHandleWay = rowErrorHandleWay;
     }
 
     @Override
     public SeaTunnelRow map(SeaTunnelRow row) {
-        return transformRow(row);
+        try {
+            return transformRow(row);
+        } catch (ErrorDataTransformException e) {
+            if (e.getErrorHandleWay() != null) {
+                ErrorHandleWay errorHandleWay = e.getErrorHandleWay();
+                if (errorHandleWay.allowSkipThisRow()) {
+                    log.debug("Skip row due to error", e);
+                    return null;
+                }
+                throw e;
+            }
+            if (rowErrorHandleWay.allowSkip()) {
+                log.debug("Skip row due to error", e);
+                return null;
+            }
+            throw e;
+        }
     }
 
     /**
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java
new file mode 100644
index 0000000000..eb4b0f4a71
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/CommonOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.common;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.Arrays;
+
+public final class CommonOptions {
+
+    public static Option<ErrorHandleWay> ROW_ERROR_HANDLE_WAY_OPTION =
+            Options.key("row_error_handle_way")
+                    .singleChoice(
+                            ErrorHandleWay.class,
+                            Arrays.asList(ErrorHandleWay.FAIL, 
ErrorHandleWay.SKIP))
+                    .defaultValue(ErrorHandleWay.FAIL)
+                    .withDescription(
+                            "The processing method of data format error. The 
default value is fail, and the optional value is (fail, skip). "
+                                    + "When fail is selected, data format 
error will block and an exception will be thrown. "
+                                    + "When skip is selected, data format 
error will skip this line data.");
+
+    public static Option<ErrorHandleWay> COLUMN_ERROR_HANDLE_WAY_OPTION =
+            Options.key("column_error_handle_way")
+                    .enumType(ErrorHandleWay.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "The processing method of data format error. "
+                                    + "When fail is selected, data format 
error will block and an exception will be thrown. "
+                                    + "When skip is selected, data format 
error will skip this column data."
+                                    + "When skip_row is selected, data format 
error will skip this line data.");
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
similarity index 64%
rename from 
seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
rename to 
seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
index 2f1314cace..0f87d6d45f 100644
--- 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/ErrorHandleWay.java
@@ -14,17 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.transform;
 
-import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory;
+package org.apache.seatunnel.transform.common;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public enum ErrorHandleWay {
+    // Fail the transformation when error occurs
+    FAIL,
+    // Skip the data when error occurs
+    SKIP,
+    // Skip the row when error occurs
+    SKIP_ROW;
 
-public class JsonPathTransformFactoryTest {
-    @Test
-    public void testOptionRule() {
-        JsonPathTransformFactory jsonPathTransformFactory = new 
JsonPathTransformFactory();
-        Assertions.assertNotNull(jsonPathTransformFactory.optionRule());
+    public boolean allowSkipThisRow() {
+        return this == SKIP_ROW;
+    }
+
+    public boolean allowSkip() {
+        return this == SKIP;
     }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
index bfe6336182..51e1172412 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/MultipleFieldOutputTransform.java
@@ -45,6 +45,11 @@ public abstract class MultipleFieldOutputTransform extends 
AbstractCatalogSuppor
         super(inputCatalogTable);
     }
 
+    public MultipleFieldOutputTransform(
+            @NonNull CatalogTable inputCatalogTable, ErrorHandleWay 
errorHandleWay) {
+        super(inputCatalogTable, errorHandleWay);
+    }
+
     @Override
     protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
         Object[] fieldValues = getOutputFieldValues(new 
SeaTunnelRowAccessor(inputRow));
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/ErrorDataTransformException.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/ErrorDataTransformException.java
new file mode 100644
index 0000000000..58b7cd5ca1
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/ErrorDataTransformException.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+
+import lombok.Getter;
+
+import java.util.Map;
+
+public class ErrorDataTransformException extends SeaTunnelRuntimeException {
+    @Getter private final ErrorHandleWay errorHandleWay;
+
+    public ErrorDataTransformException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        this(null, seaTunnelErrorCode, errorMessage);
+    }
+
+    public ErrorDataTransformException(
+            ErrorHandleWay errorHandleWay,
+            SeaTunnelErrorCode seaTunnelErrorCode,
+            String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+        this.errorHandleWay = errorHandleWay;
+    }
+
+    public ErrorDataTransformException(
+            SeaTunnelErrorCode seaTunnelErrorCode, Map<String, String> params) 
{
+        this(null, seaTunnelErrorCode, params);
+    }
+
+    public ErrorDataTransformException(
+            ErrorHandleWay errorHandleWay,
+            SeaTunnelErrorCode seaTunnelErrorCode,
+            Map<String, String> params) {
+        super(seaTunnelErrorCode, params);
+        this.errorHandleWay = errorHandleWay;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
index 1025a12651..4d56be1582 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
@@ -17,9 +17,13 @@
 package org.apache.seatunnel.transform.jsonpath;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+
+import lombok.ToString;
 
 import java.io.Serializable;
 
+@ToString
 public class ColumnConfig implements Serializable {
     private final String path;
 
@@ -28,13 +32,19 @@ public class ColumnConfig implements Serializable {
     private final String destField;
 
     private final SeaTunnelDataType<?> destType;
+    private final ErrorHandleWay errorHandleWay;
 
     public ColumnConfig(
-            String path, String srcField, String destField, 
SeaTunnelDataType<?> destType) {
+            String path,
+            String srcField,
+            String destField,
+            SeaTunnelDataType<?> destType,
+            ErrorHandleWay errorHandleWay) {
         this.path = path;
         this.srcField = srcField;
         this.destField = destField;
         this.destType = destType;
+        this.errorHandleWay = errorHandleWay;
     }
 
     public String getPath() {
@@ -52,4 +62,8 @@ public class ColumnConfig implements Serializable {
     public SeaTunnelDataType<?> getDestType() {
         return destType;
     }
+
+    public ErrorHandleWay errorHandleWay() {
+        return errorHandleWay;
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
index 34ebee0ba5..3e14e8488d 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -29,8 +29,8 @@ import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.format.json.JsonToRowConverters;
 import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
 import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
 import org.apache.seatunnel.transform.exception.TransformCommonError;
-import org.apache.seatunnel.transform.exception.TransformException;
 
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.JsonPathException;
@@ -59,7 +59,7 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
     private int[] srcFieldIndexArr;
 
     public JsonPathTransform(JsonPathTransformConfig config, CatalogTable 
catalogTable) {
-        super(catalogTable);
+        super(catalogTable, config.getErrorHandleWay());
         this.config = config;
         this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
         init();
@@ -122,11 +122,12 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
         Object[] fieldValues = new Object[size];
         for (int i = 0; i < size; i++) {
             int pos = this.srcFieldIndexArr[i];
+            ColumnConfig fieldConfig = configs.get(i);
             fieldValues[i] =
                     doTransform(
                             seaTunnelRowType.getFieldType(pos),
                             inputRow.getField(pos),
-                            configs.get(i),
+                            fieldConfig,
                             converters[i]);
         }
         return fieldValues;
@@ -168,7 +169,21 @@ public class JsonPathTransform extends 
MultipleFieldOutputTransform {
             JsonNode jsonNode = JsonUtils.toJsonNode(result);
             return converter.convert(jsonNode, null);
         } catch (JsonPathException e) {
-            throw new TransformException(JSON_PATH_COMPILE_ERROR, 
e.getMessage());
+            if (columnConfig.errorHandleWay() != null
+                    && columnConfig.errorHandleWay().allowSkip()) {
+                log.debug(
+                        "JsonPath transform error, ignore error, config: {}, 
value: {}",
+                        columnConfig,
+                        jsonString,
+                        e);
+                return null;
+            }
+            throw new ErrorDataTransformException(
+                    columnConfig.errorHandleWay(),
+                    JSON_PATH_COMPILE_ERROR,
+                    String.format(
+                            "JsonPath transform error, config: %s, value: %s, 
error: %s",
+                            columnConfig, jsonString, e.getMessage()));
         }
     }
 
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
index 7c1036cbb3..51a0d6ac34 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
@@ -23,14 +23,19 @@ import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.common.CommonOptions;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
 import org.apache.seatunnel.transform.exception.TransformException;
 
 import org.apache.commons.lang3.StringUtils;
 
+import lombok.Getter;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.COLUMNS_MUST_NOT_EMPTY;
 import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.DEST_FIELD_MUST_NOT_EMPTY;
@@ -70,13 +75,16 @@ public class JsonPathTransformConfig implements 
Serializable {
                     .withDescription("columns");
 
     private final List<ColumnConfig> columnConfigs;
+    @Getter private final ErrorHandleWay errorHandleWay;
 
     public List<ColumnConfig> getColumnConfigs() {
         return columnConfigs;
     }
 
-    public JsonPathTransformConfig(List<ColumnConfig> columnConfigs) {
+    public JsonPathTransformConfig(
+            List<ColumnConfig> columnConfigs, ErrorHandleWay errorHandleWay) {
         this.columnConfigs = columnConfigs;
+        this.errorHandleWay = errorHandleWay;
     }
 
     public static JsonPathTransformConfig of(ReadonlyConfig config) {
@@ -84,6 +92,7 @@ public class JsonPathTransformConfig implements Serializable {
             throw new TransformException(
                     COLUMNS_MUST_NOT_EMPTY, 
COLUMNS_MUST_NOT_EMPTY.getErrorMessage());
         }
+        ErrorHandleWay rowErrorHandleWay = 
config.get(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION);
         List<Map<String, String>> columns = config.get(COLUMNS);
         List<ColumnConfig> configs = new ArrayList<>(columns.size());
         for (Map<String, String> map : columns) {
@@ -92,12 +101,18 @@ public class JsonPathTransformConfig implements 
Serializable {
             String srcField = map.get(SRC_FIELD.key());
             String destField = map.get(DEST_FIELD.key());
             String type = map.getOrDefault(DEST_TYPE.key(), 
DEST_TYPE.defaultValue());
+            ErrorHandleWay columnErrorHandleWay =
+                    
Optional.ofNullable(map.get(CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key()))
+                            .map(ErrorHandleWay::valueOf)
+                            .orElse(null);
+
             SeaTunnelDataType<?> dataType =
                     
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(srcField, type);
-            ColumnConfig columnConfig = new ColumnConfig(path, srcField, 
destField, dataType);
+            ColumnConfig columnConfig =
+                    new ColumnConfig(path, srcField, destField, dataType, 
columnErrorHandleWay);
             configs.add(columnConfig);
         }
-        return new JsonPathTransformConfig(configs);
+        return new JsonPathTransformConfig(configs, rowErrorHandleWay);
     }
 
     private static void checkColumnConfig(Map<String, String> map) {
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
index 796f8dbe6d..def17f2564 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.api.table.connector.TableTransform;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactory;
 import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+import org.apache.seatunnel.transform.common.CommonOptions;
 
 import com.google.auto.service.AutoService;
 
@@ -36,7 +37,10 @@ public class JsonPathTransformFactory implements 
TableTransformFactory {
 
     @Override
     public OptionRule optionRule() {
-        return 
OptionRule.builder().required(JsonPathTransformConfig.COLUMNS).build();
+        return OptionRule.builder()
+                .required(JsonPathTransformConfig.COLUMNS)
+                .optional(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION)
+                .build();
     }
 
     @Override
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
new file mode 100644
index 0000000000..51c0c0ac30
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.shade.com.google.common.collect.ImmutableMap;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.common.CommonOptions;
+import org.apache.seatunnel.transform.common.ErrorHandleWay;
+import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransform;
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonPathTransformTest {
+
+    @Test
+    public void testJsonPath() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(), 
"data",
+                                JsonPathTransformConfig.PATH.key(), "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(), 
"f1")));
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        CatalogTable table =
+                CatalogTableUtil.getCatalogTable(
+                        "test",
+                        new SeaTunnelRowType(
+                                new String[] {"data"},
+                                new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}));
+        JsonPathTransform transform =
+                new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+
+        CatalogTable outputTable = transform.getProducedCatalogTable();
+        SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] 
{"{\"f1\": 1}"}));
+        Assertions.assertEquals(
+                "1", 
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1")));
+    }
+
+    @Test
+    public void testErrorHandleWay() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(), 
"data",
+                                JsonPathTransformConfig.PATH.key(), "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(), 
"f1")));
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+        CatalogTable table =
+                CatalogTableUtil.getCatalogTable(
+                        "test",
+                        new SeaTunnelRowType(
+                                new String[] {"data"},
+                                new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}));
+        JsonPathTransform transform =
+                new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        CatalogTable outputTable = transform.getProducedCatalogTable();
+        final JsonPathTransform finalTransform = transform;
+        Assertions.assertThrows(
+                ErrorDataTransformException.class,
+                () -> finalTransform.map(new SeaTunnelRow(new Object[] 
{"{\"f2\": 1}"})));
+
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(),
+                                "data",
+                                JsonPathTransformConfig.PATH.key(),
+                                "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(),
+                                "f1",
+                                
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+                                ErrorHandleWay.FAIL.name())));
+        config = ReadonlyConfig.fromMap(configMap);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        outputTable = transform.getProducedCatalogTable();
+        JsonPathTransform finalTransform1 = transform;
+        Assertions.assertThrows(
+                ErrorDataTransformException.class,
+                () -> finalTransform1.map(new SeaTunnelRow(new Object[] 
{"{\"f2\": 1}"})));
+
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(),
+                                "data",
+                                JsonPathTransformConfig.PATH.key(),
+                                "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(),
+                                "f1",
+                                
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+                                ErrorHandleWay.SKIP.name())));
+        config = ReadonlyConfig.fromMap(configMap);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        outputTable = transform.getProducedCatalogTable();
+        SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] 
{"{\"f2\": 1}"}));
+        Assertions.assertNotNull(outputRow);
+        
Assertions.assertNull(outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1")));
+
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(),
+                                "data",
+                                JsonPathTransformConfig.PATH.key(),
+                                "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(),
+                                "f1",
+                                
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+                                ErrorHandleWay.SKIP_ROW.name())));
+        config = ReadonlyConfig.fromMap(configMap);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        outputTable = transform.getProducedCatalogTable();
+        outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
+        Assertions.assertNull(outputRow);
+
+        configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), 
ErrorHandleWay.SKIP.name());
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(), 
"data",
+                                JsonPathTransformConfig.PATH.key(), "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(), 
"f1")));
+        config = ReadonlyConfig.fromMap(configMap);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        outputTable = transform.getProducedCatalogTable();
+        outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
+        Assertions.assertNull(outputRow);
+
+        configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), 
ErrorHandleWay.SKIP.name());
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(),
+                                "data",
+                                JsonPathTransformConfig.PATH.key(),
+                                "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(),
+                                "f1",
+                                
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+                                ErrorHandleWay.FAIL.name())));
+        config = ReadonlyConfig.fromMap(configMap);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        outputTable = transform.getProducedCatalogTable();
+        try {
+            outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
+            Assertions.fail("should throw exception");
+        } catch (Exception e) {
+            // ignore
+        }
+
+        configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), 
ErrorHandleWay.FAIL.name());
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(),
+                                "data",
+                                JsonPathTransformConfig.PATH.key(),
+                                "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(),
+                                "f1",
+                                
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+                                ErrorHandleWay.SKIP.name())));
+        config = ReadonlyConfig.fromMap(configMap);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        outputTable = transform.getProducedCatalogTable();
+        outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
+        Assertions.assertNotNull(outputRow);
+        
Assertions.assertNull(outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("f1")));
+
+        configMap.put(CommonOptions.ROW_ERROR_HANDLE_WAY_OPTION.key(), 
ErrorHandleWay.FAIL.name());
+        configMap.put(
+                JsonPathTransformConfig.COLUMNS.key(),
+                Arrays.asList(
+                        ImmutableMap.of(
+                                JsonPathTransformConfig.SRC_FIELD.key(),
+                                "data",
+                                JsonPathTransformConfig.PATH.key(),
+                                "$.f1",
+                                JsonPathTransformConfig.DEST_FIELD.key(),
+                                "f1",
+                                
CommonOptions.COLUMN_ERROR_HANDLE_WAY_OPTION.key(),
+                                ErrorHandleWay.SKIP_ROW.name())));
+        config = ReadonlyConfig.fromMap(configMap);
+        transform = new JsonPathTransform(JsonPathTransformConfig.of(config), 
table);
+        outputTable = transform.getProducedCatalogTable();
+        outputRow = transform.map(new SeaTunnelRow(new Object[] {"{\"f2\": 
1}"}));
+        Assertions.assertNull(outputRow);
+    }
+}

Reply via email to