This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d908f0af40 [Feature][Transform] add JsonPath transform (#5632)
d908f0af40 is described below

commit d908f0af407b37e0ecee0e93c4cf20f3a3804522
Author: FuYouJ <[email protected]>
AuthorDate: Tue Nov 7 12:12:42 2023 +0800

    [Feature][Transform] add JsonPath transform (#5632)
---
 docs/en/transform-v2/jsonpath.md                   | 190 ++++++++++++++++++++
 pom.xml                                            |   6 +
 release-note.md                                    |   1 +
 .../apache/seatunnel/common/utils/JsonUtils.java   |   5 +-
 .../connector-http/connector-http-base/pom.xml     |   1 -
 seatunnel-dist/release-docs/LICENSE                |   4 +-
 .../e2e/transform/TestJsonPathTransformIT.java     |  47 +++++
 .../resources/json_path_transform/array_test.conf  |  78 +++++++++
 .../json_path_basic_type_test.conf                 | 193 +++++++++++++++++++++
 .../json_path_transform/nested_row_test.conf       |  83 +++++++++
 seatunnel-shade/seatunnel-jackson/pom.xml          |  17 ++
 seatunnel-transforms-v2/pom.xml                    |  11 ++
 .../exception/JsonPathTransformErrorCode.java      |  54 ++++++
 .../seatunnel/transform/jsonpath/ColumnConfig.java |  55 ++++++
 .../transform/jsonpath/JsonPathTransform.java      | 187 ++++++++++++++++++++
 .../jsonpath/JsonPathTransformConfig.java          | 120 +++++++++++++
 .../jsonpath/JsonPathTransformFactory.java         |  49 ++++++
 .../transform/JsonPathTransformFactoryTest.java    |  30 ++++
 tools/dependencies/known-dependencies.txt          |   7 +-
 19 files changed, 1134 insertions(+), 4 deletions(-)

diff --git a/docs/en/transform-v2/jsonpath.md b/docs/en/transform-v2/jsonpath.md
new file mode 100644
index 0000000000..3baf5853b7
--- /dev/null
+++ b/docs/en/transform-v2/jsonpath.md
@@ -0,0 +1,190 @@
+# JsonPath
+
+> JsonPath transform plugin
+
+## Description
+
+> Support use jsonpath select data
+
+## Options
+
+|  name   | type  | required | default value |
+|---------|-------|----------|---------------|
+| Columns | Array | Yes      |               |
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform 
Plugin](common-options.md) for details
+
+### fields[array]
+
+#### option
+
+|    name    |  type  | required | default value |
+|------------|--------|----------|---------------|
+| src_field  | String | Yes      |               |
+| dest_field | String | Yes      |               |
+| path       | String | Yes      |               |
+| dest_type  | String | No       | String        |
+
+#### src_field
+
+> the json source field you want to parse
+
+Support SeatunnelDateType
+
+* STRING
+* BYTES
+* ARRAY
+* MAP
+* ROW
+
+#### dest_field
+
+> after use jsonpath output field
+
+#### dest_type
+
+> the type of dest field
+
+#### path
+
+> Jsonpath
+
+## Read Json Example
+
+The data read from source is a table like this json:
+
+```json
+{
+  "data": {
+    "c_string": "this is a string",
+    "c_boolean": true,
+    "c_integer": 42,
+    "c_float": 3.14,
+    "c_double": 3.14,
+    "c_decimal": 10.55,
+    "c_date": "2023-10-29",
+    "c_datetime": "16:12:43.459",
+    "c_array":["item1", "item2", "item3"]
+  }
+}
+```
+
+Assuming we want to use JsonPath to extract properties.
+
+```json
+transform {
+  JsonPath {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    columns = [
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_string"
+        "dest_field" = "c1_string"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_boolean"
+        "dest_field" = "c1_boolean"
+        "dest_type" = "boolean"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_integer"
+        "dest_field" = "c1_integer"
+        "dest_type" = "int"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_float"
+        "dest_field" = "c1_float"
+        "dest_type" = "float"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_double"
+        "dest_field" = "c1_double"
+        "dest_type" = "double"
+     },
+      {
+         "src_field" = "data"
+         "path" = "$.data.c_decimal"
+         "dest_field" = "c1_decimal"
+         "dest_type" = "decimal(4,2)"
+      },
+      {
+         "src_field" = "data"
+         "path" = "$.data.c_date"
+         "dest_field" = "c1_date"
+         "dest_type" = "date"
+      },
+      {
+         "src_field" = "data"
+         "path" = "$.data.c_datetime"
+         "dest_field" = "c1_datetime"
+         "dest_type" = "time"
+      },
+                       {
+         "src_field" = "data"
+         "path" = "$.data.c_array"
+         "dest_field" = "c1_array"
+         "dest_type" = "array<string>"        
+      }
+    ]
+  }
+}
+```
+
+Then the data result table `fake1` will like this
+
+|             data             |    c1_string     | c1_boolean | c1_integer | 
c1_float | c1_double | c1_decimal |  c1_date   | c1_datetime  |          
c1_array           |
+|------------------------------|------------------|------------|------------|----------|-----------|------------|------------|--------------|-----------------------------|
+| too much content not to show | this is a string | true       | 42         | 
3.14     | 3.14      | 10.55      | 2023-10-29 | 16:12:43.459 | ["item1", 
"item2", "item3"] |
+
+## Read SeatunnelRow Example
+
+Suppose a column in a row of data is of type SeatunnelRow and that the name of 
the column is col
+
+<table>
+<tr><th colspan="2">SeatunnelRow(col)</th><th>other</th></tr>
+<tr><td>name</td><td>age</td><td>....</td></tr>
+<tr><td>a</td><td>18</td><td>....</td></tr>
+</table>
+
+The JsonPath transform converts the values of seatunnel into an array,
+
+```json
+transform {
+  JsonPath {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    columns = [
+     {
+        "src_field" = "col"
+        "path" = "$[0]"
+        "dest_field" = "name"
+                       "dest_type" = "string"
+     },
+               {
+        "src_field" = "col"
+        "path" = "$[1]"
+        "dest_field" = "age"
+                       "dest_type" = "int"
+     }
+    ]
+  }
+}
+```
+
+Then the data result table `fake1` will like this
+
+| name | age |   col    | other |
+|------|-----|----------|-------|
+| a    | 18  | ["a",18] | ...   |
+
+## Changelog
+
+* Add JsonPath Transform
+
diff --git a/pom.xml b/pom.xml
index 41afca5217..f3e1dd5594 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
         <testcontainer.version>1.17.6</testcontainer.version>
         <spotless.version>2.29.0</spotless.version>
         <jsqlparser.version>4.5</jsqlparser.version>
+        <json-path.version>2.7.0</json-path.version>
         <!-- Option args -->
         <skipUT>false</skipUT>
         <skipIT>true</skipIT>
@@ -355,6 +356,11 @@
                 <artifactId>jackson-annotations</artifactId>
                 <version>${jackson.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.datatype</groupId>
+                <artifactId>jackson-datatype-jsr310</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/release-note.md b/release-note.md
index 545039062b..1e352b80c7 100644
--- a/release-note.md
+++ b/release-note.md
@@ -12,6 +12,7 @@
 ### Transformer
 - [Spark] Support transform-v2 for spark (#3409)
 - [ALL]Add FieldMapper Transform #3781
+- [All]Add JsonPath Transform #5633
 ### Connectors
 - [Elasticsearch] Support https protocol & compatible with opensearch
 - [Hbase] Add hbase sink connector #4049
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
index 84ac374b25..885d29a6b7 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -34,6 +34,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.JsonNodeTy
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
 import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.type.CollectionType;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -59,7 +60,9 @@ public class JsonUtils {
                     .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
                     .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
                     .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
-                    .setTimeZone(TimeZone.getDefault());
+                    .setTimeZone(TimeZone.getDefault())
+                    // support java8 time api
+                    .registerModule(new JavaTimeModule());
 
     private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new 
ObjectMapper();
 
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml 
b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index 2164e8f8df..f18d6f9fef 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -33,7 +33,6 @@
         <httpclient.version>4.5.13</httpclient.version>
         <httpcore.version>4.4.4</httpcore.version>
         <guava-retrying.version>2.0.0</guava-retrying.version>
-        <json-path.version>2.7.0</json-path.version>
     </properties>
 
     <dependencies>
diff --git a/seatunnel-dist/release-docs/LICENSE 
b/seatunnel-dist/release-docs/LICENSE
index 08f41da2a5..310d061833 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -242,6 +242,7 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) jackson-databind 
(com.fasterxml.jackson.core:jackson-databind:2.13.3  - 
http://github.com/FasterXML/jackson)
      (The Apache Software License, Version 2.0) Jackson-dataformat-properties 
(com.fasterxml.jackson.dataformat:jackson-dataformat-properties:2.13.3 - 
https://github.com/FasterXML/jackson-dataformats-text)
      (The Apache Software License, Version 2.0) Data Mapper for Jackson 
(org.codehaus.jackson:jackson-mapper-asl:1.9.13 - http://jackson.codehaus.org)
+     (The Apache Software License, Version 2.0) jackson-datatype-jsr310 
(com.fasterxml.jackson.dataformat:jackson-datatype-jsr310:2.13.3 - 
https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.13.3)
      (Apache License, Version 2.0) jcommander (com.beust:jcommander:1.81 - 
https://jcommander.org)
      (The Apache Software License, Version 2.0) FindBugs-jsr305 
(com.google.code.findbugs:jsr305:1.3.9 - http://findbugs.sourceforge.net/)
      (The Apache Software License, Version 2.0) FindBugs-jsr305 
(com.google.code.findbugs:jsr305:3.0.0 - http://findbugs.sourceforge.net/)
@@ -273,8 +274,9 @@ The text of each license is the standard Apache 2.0 license.
      (Apache-2.0) failureaccess (com.google.guava:failureaccess:1.0 
https://mvnrepository.com/artifact/com.google.guava/failureaccess/1.0)
      (Apache-2.0) j2objc-annotations (com.google.j2objc:j2objc-annotations:1.1 
https://mvnrepository.com/artifact/com.google.j2objc/j2objc-annotations/1.1)
      (Apache-2.0) listenablefuture 
(com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava 
https://mvnrepository.com/artifact/com.google.guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava)
-     (Apache-2.0) accessors-smart (com.google.guava:accessors-smart:2.4.7 - 
https://mvnrepository.com/artifact/net.minidev/accessors-smart)
+     (Apache-2.0) accessors-smart (net.minidev:accessors-smart:2.4.7 - 
https://mvnrepository.com/artifact/net.minidev/accessors-smart)
      (Apache-2.0) json-smart (net.minidev:json-smart:2.4.7 - 
https://mvnrepository.com/artifact/net.minidev/json-smart)
+     (Apache-2.0) json-path (com.jayway.jsonpath:json-path:2.7.0 - 
https://mvnrepository.com/artifact/com.jayway.jsonpath/json-path)
 
 ========================================================================
 MOZILLA PUBLIC LICENSE License
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
new file mode 100644
index 0000000000..9c3c7aa22e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+public class TestJsonPathTransformIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testBasicType(TestContainer container) throws Exception {
+        Container.ExecResult execResult =
+                
container.executeJob("/json_path_transform/json_path_basic_type_test.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testArray(TestContainer container) throws Exception {
+        Container.ExecResult execResult =
+                container.executeJob("/json_path_transform/array_test.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testNestedRow(TestContainer container) throws Exception {
+        Container.ExecResult execResult =
+                
container.executeJob("/json_path_transform/nested_row_test.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/array_test.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/array_test.conf
new file mode 100644
index 0000000000..42dc48de27
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/array_test.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        c_array = "array<string>"
+      }
+    }
+  }
+}
+
+transform {
+  JsonPath {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    columns = [
+     {
+        "src_field" = "c_array"
+        "path" = "$[0]"
+        "dest_field" = "test_str"
+     }
+    ]
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = test_str
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf
new file mode 100644
index 0000000000..97fa1ae036
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf
@@ -0,0 +1,193 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    string.fake.mode = "template"
+    string.template=["{"data":{"c_string": "this is a string","c_boolean": 
"true","c_integer": "42","c_float": "3.14","c_double": "3.14","c_decimal": 
"10.55","c_date":"'2023-10-29'","c_datetime":\"16:12:43.459\"}}"]
+    schema = {
+      fields {
+        data = "string"
+      }
+    }
+  }
+}
+
+transform {
+  JsonPath {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    columns = [
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_string"
+        "dest_field" = "c1_string"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_boolean"
+        "dest_field" = "c1_boolean"
+        "dest_type" = "boolean"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_integer"
+        "dest_field" = "c1_integer"
+        "dest_type" = "int"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_float"
+        "dest_field" = "c1_float"
+        "dest_type" = "float"
+     },
+     {
+        "src_field" = "data"
+        "path" = "$.data.c_double"
+        "dest_field" = "c1_double"
+        "dest_type" = "double"
+     },
+      {
+         "src_field" = "data"
+         "path" = "$.data.c_decimal"
+         "dest_field" = "c1_decimal"
+         "dest_type" = "decimal(4,2)"
+      },
+      {
+         "src_field" = "data"
+         "path" = "$.data.c_date"
+         "dest_field" = "c1_date"
+         "dest_type" = "date"
+      },
+      {
+         "src_field" = "data"
+         "path" = "$.data.c_datetime"
+         "dest_field" = "c1_datetime"
+         "dest_type" = "time"
+      }
+    ]
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = c1_string
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "this is a string"
+              }
+            ]
+          },
+          {
+            field_name = c1_boolean
+            field_type = boolean
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "true"
+              }
+            ]
+          },
+          {
+            field_name = c1_integer
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = 42
+              }
+            ]
+          },
+          {
+            field_name = c1_float
+            field_type = float
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = 3.14
+              }
+            ]
+          },
+          {
+            field_name = c1_double
+            field_type = double
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = 3.14
+              }
+            ]
+          },
+          {
+            field_name = c1_decimal
+            field_type = decimal
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = 10.55
+              }
+            ]
+          },
+          {
+            field_name = c1_date
+            field_type = date
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "2023-10-29"
+              }
+            ]
+          },
+          {
+            field_name = c1_datetime
+            field_type = time
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "16:12:43.459"
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/nested_row_test.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/nested_row_test.conf
new file mode 100644
index 0000000000..7ed0bc05a1
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/nested_row_test.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+FakeSource {
+  row.num = 10
+  schema = {
+    fields {
+      c_row = {
+        c_map = "map<string, map<string, string>>"
+        c_array = "array<int>"
+        c_string = string
+      }
+    }
+  }
+  result_table_name = "fake"
+}
+}
+
+transform {
+  JsonPath {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    columns = [
+     {
+        "src_field" = "c_row"
+        "path" = "$[2]"
+        "dest_field" = "test_str"
+        "dest_type" = "string"
+     }
+    ]
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 10
+          }
+        ],
+        field_rules = [
+          {
+            field_name = test_str
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-shade/seatunnel-jackson/pom.xml 
b/seatunnel-shade/seatunnel-jackson/pom.xml
index f3bf3050b3..b328e29ed4 100644
--- a/seatunnel-shade/seatunnel-jackson/pom.xml
+++ b/seatunnel-shade/seatunnel-jackson/pom.xml
@@ -31,6 +31,23 @@
             <artifactId>jackson-dataformat-properties</artifactId>
             <version>${jackson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-jsr310</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-transforms-v2/pom.xml b/seatunnel-transforms-v2/pom.xml
index 5a6e592330..833b14d372 100644
--- a/seatunnel-transforms-v2/pom.xml
+++ b/seatunnel-transforms-v2/pom.xml
@@ -50,6 +50,17 @@
             <artifactId>jsqlparser</artifactId>
             <version>${jsqlparser.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+            <version>${json-path.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${revision}</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/JsonPathTransformErrorCode.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/JsonPathTransformErrorCode.java
new file mode 100644
index 0000000000..52b75a1ed0
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/JsonPathTransformErrorCode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum JsonPathTransformErrorCode implements SeaTunnelErrorCode {
+    COLUMNS_MUST_NOT_EMPTY(
+            "JSONPATH_ERROR_CODE-01", "JsonPathTransform config columns must 
not empty"),
+    SRC_FIELD_MUST_NOT_EMPTY(
+            "JSONPATH_ERROR_CODE-02", "JsonPathTransform src_field must not 
empty"),
+    PATH_MUST_NOT_EMPTY(
+            "JSONPATH_ERROR_CODE-03", "JsonPathTransform config field path 
must not empty"),
+    DEST_FIELD_MUST_NOT_EMPTY(
+            "JSONPATH_ERROR_CODE-04", "JsonPathTransform dest_field must not 
empty"),
+
+    JSON_PATH_COMPILE_ERROR("JSONPATH_ERROR_CODE-05", "JsonPathTransform path 
is invalid"),
+    DEST_TYPE_MUST_NOT_EMPTY(
+            "JSONPATH_ERROR_CODE-06", "JsonPathTransform dest_type must not 
empty"),
+    SRC_FIELD_NOT_FOUND(
+            "JSONPATH_ERROR_CODE-02", "JsonPathTransform src_field not found 
in source"),
+    ;
+    private final String code;
+    private final String description;
+
+    JsonPathTransformErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
new file mode 100644
index 0000000000..1025a12651
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.jsonpath;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.Serializable;
+
+public class ColumnConfig implements Serializable {
+    private final String path;
+
+    private final String srcField;
+
+    private final String destField;
+
+    private final SeaTunnelDataType<?> destType;
+
+    public ColumnConfig(
+            String path, String srcField, String destField, 
SeaTunnelDataType<?> destType) {
+        this.path = path;
+        this.srcField = srcField;
+        this.destField = destField;
+        this.destType = destType;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    public String getSrcField() {
+        return srcField;
+    }
+
+    public String getDestField() {
+        return destField;
+    }
+
+    public SeaTunnelDataType<?> getDestType() {
+        return destType;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
new file mode 100644
index 0000000000..874d29da8b
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.jsonpath;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.JsonPathException;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.JSON_PATH_COMPILE_ERROR;
+import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_NOT_FOUND;
+
+@Slf4j
+public class JsonPathTransform extends MultipleFieldOutputTransform {
+
+    public static final String PLUGIN_NAME = "JsonPath";
+    private static final Map<String, JsonPath> JSON_PATH_CACHE = new 
ConcurrentHashMap<>();
+    private final JsonPathTransformConfig config;
+    private final SeaTunnelRowType seaTunnelRowType;
+
+    private JsonToRowConverters.JsonToRowConverter[] converters;
+    private SeaTunnelRowType outputSeaTunnelRowType;
+
+    private int[] srcFieldIndexArr;
+
+    public JsonPathTransform(JsonPathTransformConfig config, CatalogTable 
catalogTable) {
+        super(catalogTable);
+        this.config = config;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        init();
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    private void init() {
+
+        initSrcFieldIndexArr();
+        initOutputSeaTunnelRowType();
+        initConverters();
+    }
+
+    private void initConverters() {
+        JsonToRowConverters jsonToRowConverters = new 
JsonToRowConverters(false, false);
+        this.converters =
+                this.config.getColumnConfigs().stream()
+                        .map(ColumnConfig::getDestType)
+                        .map(jsonToRowConverters::createConverter)
+                        
.toArray(JsonToRowConverters.JsonToRowConverter[]::new);
+    }
+
+    private void initOutputSeaTunnelRowType() {
+
+        SeaTunnelDataType<?>[] dataTypes =
+                this.config.getColumnConfigs().stream()
+                        .map(ColumnConfig::getDestType)
+                        .toArray(SeaTunnelDataType<?>[]::new);
+        this.outputSeaTunnelRowType =
+                new SeaTunnelRowType(
+                        this.config.getColumnConfigs().stream()
+                                .map(ColumnConfig::getDestField)
+                                .toArray(String[]::new),
+                        dataTypes);
+    }
+
+    private void initSrcFieldIndexArr() {
+        List<ColumnConfig> columnConfigs = this.config.getColumnConfigs();
+        Set<String> fieldNameSet = new 
HashSet<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
+        this.srcFieldIndexArr = new int[columnConfigs.size()];
+
+        for (int i = 0; i < columnConfigs.size(); i++) {
+            ColumnConfig columnConfig = columnConfigs.get(i);
+            String srcField = columnConfig.getSrcField();
+            if (!fieldNameSet.contains(srcField)) {
+                throw new TransformException(
+                        SRC_FIELD_NOT_FOUND,
+                        String.format(
+                                "JsonPathTransform config not found 
src_field:[%s]", srcField));
+            }
+            this.srcFieldIndexArr[i] = seaTunnelRowType.indexOf(srcField);
+        }
+    }
+
+    @Override
+    protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+        List<ColumnConfig> configs = this.config.getColumnConfigs();
+        int size = configs.size();
+        Object[] fieldValues = new Object[size];
+        for (int i = 0; i < size; i++) {
+            int pos = this.srcFieldIndexArr[i];
+            fieldValues[i] =
+                    doTransform(
+                            seaTunnelRowType.getFieldType(pos),
+                            inputRow.getField(pos),
+                            configs.get(i),
+                            converters[i]);
+        }
+        return fieldValues;
+    }
+
+    private Object doTransform(
+            SeaTunnelDataType<?> inputDataType,
+            Object value,
+            ColumnConfig columnConfig,
+            JsonToRowConverters.JsonToRowConverter converter) {
+        if (value == null) {
+            return null;
+        }
+        JSON_PATH_CACHE.computeIfAbsent(columnConfig.getPath(), 
JsonPath::compile);
+        String jsonString = "";
+        try {
+            switch (inputDataType.getSqlType()) {
+                case STRING:
+                    jsonString = value.toString();
+                    break;
+                case BYTES:
+                    jsonString = new String((byte[]) value);
+                    break;
+                case ARRAY:
+                case MAP:
+                    jsonString = JsonUtils.toJsonString(value);
+                    break;
+                case ROW:
+                    SeaTunnelRow row = (SeaTunnelRow) value;
+                    jsonString = JsonUtils.toJsonString(row.getFields());
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "JsonPathTransform unsupported sourceDataType: "
+                                    + inputDataType.getSqlType());
+            }
+            Object result = 
JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString);
+            JsonNode jsonNode = JsonUtils.toJsonNode(result);
+            return converter.convert(jsonNode);
+        } catch (JsonPathException e) {
+            throw new TransformException(JSON_PATH_COMPILE_ERROR, 
e.getMessage());
+        }
+    }
+
+    @Override
+    protected Column[] getOutputColumns() {
+        int len = this.outputSeaTunnelRowType.getTotalFields();
+        Column[] columns = new Column[len];
+        for (int i = 0; i < len; i++) {
+            String fieldName = this.outputSeaTunnelRowType.getFieldName(i);
+            SeaTunnelDataType<?> fieldType = 
this.outputSeaTunnelRowType.getFieldType(i);
+            columns[i] = PhysicalColumn.of(fieldName, fieldType, 200, true, 
"", "");
+        }
+        return columns;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
new file mode 100644
index 0000000000..b50f0ed73d
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.jsonpath;
+
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.COLUMNS_MUST_NOT_EMPTY;
+import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.DEST_FIELD_MUST_NOT_EMPTY;
+import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.PATH_MUST_NOT_EMPTY;
+import static 
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_MUST_NOT_EMPTY;
+
+public class JsonPathTransformConfig implements Serializable {
+
+    public static final Option<String> PATH =
+            Options.key("path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("JSONPath for Selecting Field from 
JSON.");
+
+    public static final Option<String> SRC_FIELD =
+            Options.key("src_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("JSON source field.");
+
+    public static final Option<String> DEST_FIELD =
+            Options.key("dest_field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("output field.");
+
+    public static final Option<String> DEST_TYPE =
+            Options.key("dest_type")
+                    .stringType()
+                    .defaultValue("string")
+                    .withDescription("output field type,default string");
+
+    public static final Option<List<Map<String, String>>> COLUMNS =
+            Options.key("columns")
+                    .type(new TypeReference<List<Map<String, String>>>() {})
+                    .noDefaultValue()
+                    .withDescription("columns");
+
+    private final List<ColumnConfig> columnConfigs;
+
+    public List<ColumnConfig> getColumnConfigs() {
+        return columnConfigs;
+    }
+
+    public JsonPathTransformConfig(List<ColumnConfig> columnConfigs) {
+        this.columnConfigs = columnConfigs;
+    }
+
+    public static JsonPathTransformConfig of(ReadonlyConfig config) {
+        if (!config.toConfig().hasPath(COLUMNS.key())) {
+            throw new TransformException(
+                    COLUMNS_MUST_NOT_EMPTY, 
COLUMNS_MUST_NOT_EMPTY.getErrorMessage());
+        }
+        List<Map<String, String>> columns = config.get(COLUMNS);
+        List<ColumnConfig> configs = new ArrayList<>(columns.size());
+        for (Map<String, String> map : columns) {
+            checkColumnConfig(map);
+            String path = map.get(PATH.key());
+            String srcField = map.get(SRC_FIELD.key());
+            String destField = map.get(DEST_FIELD.key());
+            String type = map.getOrDefault(DEST_TYPE.key(), 
DEST_TYPE.defaultValue());
+            SeaTunnelDataType<?> dataType =
+                    
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(type);
+            ColumnConfig columnConfig = new ColumnConfig(path, srcField, 
destField, dataType);
+            configs.add(columnConfig);
+        }
+        return new JsonPathTransformConfig(configs);
+    }
+
+    private static void checkColumnConfig(Map<String, String> map) {
+        String path = map.get(PATH.key());
+        if (StringUtils.isBlank(path)) {
+            throw new TransformException(
+                    PATH_MUST_NOT_EMPTY, 
PATH_MUST_NOT_EMPTY.getErrorMessage());
+        }
+        String srcField = map.get(SRC_FIELD.key());
+        if (StringUtils.isBlank(srcField)) {
+            throw new TransformException(
+                    SRC_FIELD_MUST_NOT_EMPTY, 
SRC_FIELD_MUST_NOT_EMPTY.getErrorMessage());
+        }
+        String destField = map.get(DEST_FIELD.key());
+        if (StringUtils.isBlank(destField)) {
+            throw new TransformException(
+                    DEST_FIELD_MUST_NOT_EMPTY, 
DEST_FIELD_MUST_NOT_EMPTY.getErrorMessage());
+        }
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
new file mode 100644
index 0000000000..796f8dbe6d
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.jsonpath;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class JsonPathTransformFactory implements TableTransformFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "JsonPath";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return 
OptionRule.builder().required(JsonPathTransformConfig.COLUMNS).build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
+        ReadonlyConfig options = context.getOptions();
+        JsonPathTransformConfig jsonPathTransformConfig = 
JsonPathTransformConfig.of(options);
+        return () -> new JsonPathTransform(jsonPathTransformConfig, 
catalogTable);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
new file mode 100644
index 0000000000..2f1314cace
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class JsonPathTransformFactoryTest {
+    @Test
+    public void testOptionRule() {
+        JsonPathTransformFactory jsonPathTransformFactory = new 
JsonPathTransformFactory();
+        Assertions.assertNotNull(jsonPathTransformFactory.optionRule());
+    }
+}
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 60aea7fa79..52205f52e4 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -11,6 +11,7 @@ jackson-annotations-2.13.3.jar
 jackson-core-2.13.3.jar
 jackson-databind-2.13.3.jar
 jackson-dataformat-properties-2.13.3.jar
+jackson-datatype-jsr310-2.13.3.jar
 jcl-over-slf4j-1.7.25.jar
 jcommander-1.81.jar
 log4j-api-2.17.1.jar
@@ -34,4 +35,8 @@ j2objc-annotations-1.1.jar
 jsr305-1.3.9.jar
 jsr305-3.0.0.jar
 jsr305-3.0.2.jar
-listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
\ No newline at end of file
+listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
+json-path-2.7.0.jar
+json-smart-2.4.7.jar
+accessors-smart-2.4.7.jar
+asm-9.1.jar
\ No newline at end of file

Reply via email to