This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d908f0af40 [Feature][Transform] add JsonPath transform (#5632)
d908f0af40 is described below
commit d908f0af407b37e0ecee0e93c4cf20f3a3804522
Author: FuYouJ <[email protected]>
AuthorDate: Tue Nov 7 12:12:42 2023 +0800
[Feature][Transform] add JsonPath transform (#5632)
---
docs/en/transform-v2/jsonpath.md | 190 ++++++++++++++++++++
pom.xml | 6 +
release-note.md | 1 +
.../apache/seatunnel/common/utils/JsonUtils.java | 5 +-
.../connector-http/connector-http-base/pom.xml | 1 -
seatunnel-dist/release-docs/LICENSE | 4 +-
.../e2e/transform/TestJsonPathTransformIT.java | 47 +++++
.../resources/json_path_transform/array_test.conf | 78 +++++++++
.../json_path_basic_type_test.conf | 193 +++++++++++++++++++++
.../json_path_transform/nested_row_test.conf | 83 +++++++++
seatunnel-shade/seatunnel-jackson/pom.xml | 17 ++
seatunnel-transforms-v2/pom.xml | 11 ++
.../exception/JsonPathTransformErrorCode.java | 54 ++++++
.../seatunnel/transform/jsonpath/ColumnConfig.java | 55 ++++++
.../transform/jsonpath/JsonPathTransform.java | 187 ++++++++++++++++++++
.../jsonpath/JsonPathTransformConfig.java | 120 +++++++++++++
.../jsonpath/JsonPathTransformFactory.java | 49 ++++++
.../transform/JsonPathTransformFactoryTest.java | 30 ++++
tools/dependencies/known-dependencies.txt | 7 +-
19 files changed, 1134 insertions(+), 4 deletions(-)
diff --git a/docs/en/transform-v2/jsonpath.md b/docs/en/transform-v2/jsonpath.md
new file mode 100644
index 0000000000..3baf5853b7
--- /dev/null
+++ b/docs/en/transform-v2/jsonpath.md
@@ -0,0 +1,190 @@
+# JsonPath
+
+> JsonPath transform plugin
+
+## Description
+
+> Support use jsonpath select data
+
+## Options
+
+| name | type | required | default value |
+|---------|-------|----------|---------------|
+| Columns | Array | Yes | |
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform
Plugin](common-options.md) for details
+
+### fields[array]
+
+#### option
+
+| name | type | required | default value |
+|------------|--------|----------|---------------|
+| src_field | String | Yes | |
+| dest_field | String | Yes | |
+| path | String | Yes | |
+| dest_type | String | No | String |
+
+#### src_field
+
+> the json source field you want to parse
+
+Support SeatunnelDateType
+
+* STRING
+* BYTES
+* ARRAY
+* MAP
+* ROW
+
+#### dest_field
+
+> after use jsonpath output field
+
+#### dest_type
+
+> the type of dest field
+
+#### path
+
+> Jsonpath
+
+## Read Json Example
+
+The data read from source is a table like this json:
+
+```json
+{
+ "data": {
+ "c_string": "this is a string",
+ "c_boolean": true,
+ "c_integer": 42,
+ "c_float": 3.14,
+ "c_double": 3.14,
+ "c_decimal": 10.55,
+ "c_date": "2023-10-29",
+ "c_datetime": "16:12:43.459",
+ "c_array":["item1", "item2", "item3"]
+ }
+}
+```
+
+Assuming we want to use JsonPath to extract properties.
+
+```json
+transform {
+ JsonPath {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ columns = [
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_string"
+ "dest_field" = "c1_string"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_boolean"
+ "dest_field" = "c1_boolean"
+ "dest_type" = "boolean"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_integer"
+ "dest_field" = "c1_integer"
+ "dest_type" = "int"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_float"
+ "dest_field" = "c1_float"
+ "dest_type" = "float"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_double"
+ "dest_field" = "c1_double"
+ "dest_type" = "double"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_decimal"
+ "dest_field" = "c1_decimal"
+ "dest_type" = "decimal(4,2)"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_date"
+ "dest_field" = "c1_date"
+ "dest_type" = "date"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_datetime"
+ "dest_field" = "c1_datetime"
+ "dest_type" = "time"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_array"
+ "dest_field" = "c1_array"
+ "dest_type" = "array<string>"
+ }
+ ]
+ }
+}
+```
+
+Then the data result table `fake1` will like this
+
+| data | c1_string | c1_boolean | c1_integer |
c1_float | c1_double | c1_decimal | c1_date | c1_datetime |
c1_array |
+|------------------------------|------------------|------------|------------|----------|-----------|------------|------------|--------------|-----------------------------|
+| too much content not to show | this is a string | true | 42 |
3.14 | 3.14 | 10.55 | 2023-10-29 | 16:12:43.459 | ["item1",
"item2", "item3"] |
+
+## Read SeatunnelRow Example
+
+Suppose a column in a row of data is of type SeatunnelRow and that the name of
the column is col
+
+<table>
+<tr><th colspan="2">SeatunnelRow(col)</th><th>other</th></tr>
+<tr><td>name</td><td>age</td><td>....</td></tr>
+<tr><td>a</td><td>18</td><td>....</td></tr>
+</table>
+
+The JsonPath transform converts the values of seatunnel into an array,
+
+```json
+transform {
+ JsonPath {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ columns = [
+ {
+ "src_field" = "col"
+ "path" = "$[0]"
+ "dest_field" = "name"
+ "dest_type" = "string"
+ },
+ {
+ "src_field" = "col"
+ "path" = "$[1]"
+ "dest_field" = "age"
+ "dest_type" = "int"
+ }
+ ]
+ }
+}
+```
+
+Then the data result table `fake1` will like this
+
+| name | age | col | other |
+|------|-----|----------|-------|
+| a | 18 | ["a",18] | ... |
+
+## Changelog
+
+* Add JsonPath Transform
+
diff --git a/pom.xml b/pom.xml
index 41afca5217..f3e1dd5594 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
<testcontainer.version>1.17.6</testcontainer.version>
<spotless.version>2.29.0</spotless.version>
<jsqlparser.version>4.5</jsqlparser.version>
+ <json-path.version>2.7.0</json-path.version>
<!-- Option args -->
<skipUT>false</skipUT>
<skipIT>true</skipIT>
@@ -355,6 +356,11 @@
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git a/release-note.md b/release-note.md
index 545039062b..1e352b80c7 100644
--- a/release-note.md
+++ b/release-note.md
@@ -12,6 +12,7 @@
### Transformer
- [Spark] Support transform-v2 for spark (#3409)
- [ALL]Add FieldMapper Transform #3781
+- [All]Add JsonPath Transform #5633
### Connectors
- [Elasticsearch] Support https protocol & compatible with opensearch
- [Hbase] Add hbase sink connector #4049
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
index 84ac374b25..885d29a6b7 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.JsonNodeTy
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.type.CollectionType;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.lang3.StringUtils;
@@ -59,7 +60,9 @@ public class JsonUtils {
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
- .setTimeZone(TimeZone.getDefault());
+ .setTimeZone(TimeZone.getDefault())
+ // support java8 time api
+ .registerModule(new JavaTimeModule());
private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new
ObjectMapper();
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
index 2164e8f8df..f18d6f9fef 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
@@ -33,7 +33,6 @@
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
- <json-path.version>2.7.0</json-path.version>
</properties>
<dependencies>
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 08f41da2a5..310d061833 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -242,6 +242,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) jackson-databind
(com.fasterxml.jackson.core:jackson-databind:2.13.3 -
http://github.com/FasterXML/jackson)
(The Apache Software License, Version 2.0) Jackson-dataformat-properties
(com.fasterxml.jackson.dataformat:jackson-dataformat-properties:2.13.3 -
https://github.com/FasterXML/jackson-dataformats-text)
(The Apache Software License, Version 2.0) Data Mapper for Jackson
(org.codehaus.jackson:jackson-mapper-asl:1.9.13 - http://jackson.codehaus.org)
+ (The Apache Software License, Version 2.0) jackson-datatype-jsr310
(com.fasterxml.jackson.dataformat:jackson-datatype-jsr310:2.13.3 -
https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.13.3)
(Apache License, Version 2.0) jcommander (com.beust:jcommander:1.81 -
https://jcommander.org)
(The Apache Software License, Version 2.0) FindBugs-jsr305
(com.google.code.findbugs:jsr305:1.3.9 - http://findbugs.sourceforge.net/)
(The Apache Software License, Version 2.0) FindBugs-jsr305
(com.google.code.findbugs:jsr305:3.0.0 - http://findbugs.sourceforge.net/)
@@ -273,8 +274,9 @@ The text of each license is the standard Apache 2.0 license.
(Apache-2.0) failureaccess (com.google.guava:failureaccess:1.0
https://mvnrepository.com/artifact/com.google.guava/failureaccess/1.0)
(Apache-2.0) j2objc-annotations (com.google.j2objc:j2objc-annotations:1.1
https://mvnrepository.com/artifact/com.google.j2objc/j2objc-annotations/1.1)
(Apache-2.0) listenablefuture
(com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
https://mvnrepository.com/artifact/com.google.guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava)
- (Apache-2.0) accessors-smart (com.google.guava:accessors-smart:2.4.7 -
https://mvnrepository.com/artifact/net.minidev/accessors-smart)
+ (Apache-2.0) accessors-smart (net.minidev:accessors-smart:2.4.7 -
https://mvnrepository.com/artifact/net.minidev/accessors-smart)
(Apache-2.0) json-smart (net.minidev:json-smart:2.4.7 -
https://mvnrepository.com/artifact/net.minidev/json-smart)
+ (Apache-2.0) json-path (com.jayway.jsonpath:json-path:2.7.0 -
https://mvnrepository.com/artifact/com.jayway.jsonpath/json-path)
========================================================================
MOZILLA PUBLIC LICENSE License
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
new file mode 100644
index 0000000000..9c3c7aa22e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+public class TestJsonPathTransformIT extends TestSuiteBase {
+
+ @TestTemplate
+ public void testBasicType(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/json_path_transform/json_path_basic_type_test.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testArray(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
+ container.executeJob("/json_path_transform/array_test.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testNestedRow(TestContainer container) throws Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/json_path_transform/nested_row_test.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/array_test.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/array_test.conf
new file mode 100644
index 0000000000..42dc48de27
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/array_test.conf
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ schema = {
+ fields {
+ c_array = "array<string>"
+ }
+ }
+ }
+}
+
+transform {
+ JsonPath {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ columns = [
+ {
+ "src_field" = "c_array"
+ "path" = "$[0]"
+ "dest_field" = "test_str"
+ }
+ ]
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = test_str
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf
new file mode 100644
index 0000000000..97fa1ae036
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/json_path_basic_type_test.conf
@@ -0,0 +1,193 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ string.fake.mode = "template"
+ string.template=["{"data":{"c_string": "this is a string","c_boolean":
"true","c_integer": "42","c_float": "3.14","c_double": "3.14","c_decimal":
"10.55","c_date":"'2023-10-29'","c_datetime":\"16:12:43.459\"}}"]
+ schema = {
+ fields {
+ data = "string"
+ }
+ }
+ }
+}
+
+transform {
+ JsonPath {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ columns = [
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_string"
+ "dest_field" = "c1_string"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_boolean"
+ "dest_field" = "c1_boolean"
+ "dest_type" = "boolean"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_integer"
+ "dest_field" = "c1_integer"
+ "dest_type" = "int"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_float"
+ "dest_field" = "c1_float"
+ "dest_type" = "float"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_double"
+ "dest_field" = "c1_double"
+ "dest_type" = "double"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_decimal"
+ "dest_field" = "c1_decimal"
+ "dest_type" = "decimal(4,2)"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_date"
+ "dest_field" = "c1_date"
+ "dest_type" = "date"
+ },
+ {
+ "src_field" = "data"
+ "path" = "$.data.c_datetime"
+ "dest_field" = "c1_datetime"
+ "dest_type" = "time"
+ }
+ ]
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c1_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "this is a string"
+ }
+ ]
+ },
+ {
+ field_name = c1_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "true"
+ }
+ ]
+ },
+ {
+ field_name = c1_integer
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 42
+ }
+ ]
+ },
+ {
+ field_name = c1_float
+ field_type = float
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 3.14
+ }
+ ]
+ },
+ {
+ field_name = c1_double
+ field_type = double
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 3.14
+ }
+ ]
+ },
+ {
+ field_name = c1_decimal
+ field_type = decimal
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = 10.55
+ }
+ ]
+ },
+ {
+ field_name = c1_date
+ field_type = date
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "2023-10-29"
+ }
+ ]
+ },
+ {
+ field_name = c1_datetime
+ field_type = time
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ equals_to = "16:12:43.459"
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/nested_row_test.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/nested_row_test.conf
new file mode 100644
index 0000000000..7ed0bc05a1
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/json_path_transform/nested_row_test.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+FakeSource {
+ row.num = 10
+ schema = {
+ fields {
+ c_row = {
+ c_map = "map<string, map<string, string>>"
+ c_array = "array<int>"
+ c_string = string
+ }
+ }
+ }
+ result_table_name = "fake"
+}
+}
+
+transform {
+ JsonPath {
+ source_table_name = "fake"
+ result_table_name = "fake1"
+ columns = [
+ {
+ "src_field" = "c_row"
+ "path" = "$[2]"
+ "dest_field" = "test_str"
+ "dest_type" = "string"
+ }
+ ]
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "fake1"
+ }
+ Assert {
+ source_table_name = "fake1"
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MIN_ROW
+ rule_value = 10
+ }
+ ],
+ field_rules = [
+ {
+ field_name = test_str
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-shade/seatunnel-jackson/pom.xml
b/seatunnel-shade/seatunnel-jackson/pom.xml
index f3bf3050b3..b328e29ed4 100644
--- a/seatunnel-shade/seatunnel-jackson/pom.xml
+++ b/seatunnel-shade/seatunnel-jackson/pom.xml
@@ -31,6 +31,23 @@
<artifactId>jackson-dataformat-properties</artifactId>
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-transforms-v2/pom.xml b/seatunnel-transforms-v2/pom.xml
index 5a6e592330..833b14d372 100644
--- a/seatunnel-transforms-v2/pom.xml
+++ b/seatunnel-transforms-v2/pom.xml
@@ -50,6 +50,17 @@
<artifactId>jsqlparser</artifactId>
<version>${jsqlparser.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ <version>${json-path.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${revision}</version>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/JsonPathTransformErrorCode.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/JsonPathTransformErrorCode.java
new file mode 100644
index 0000000000..52b75a1ed0
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/JsonPathTransformErrorCode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum JsonPathTransformErrorCode implements SeaTunnelErrorCode {
+ COLUMNS_MUST_NOT_EMPTY(
+ "JSONPATH_ERROR_CODE-01", "JsonPathTransform config columns must
not empty"),
+ SRC_FIELD_MUST_NOT_EMPTY(
+ "JSONPATH_ERROR_CODE-02", "JsonPathTransform src_field must not
empty"),
+ PATH_MUST_NOT_EMPTY(
+ "JSONPATH_ERROR_CODE-03", "JsonPathTransform config field path
must not empty"),
+ DEST_FIELD_MUST_NOT_EMPTY(
+ "JSONPATH_ERROR_CODE-04", "JsonPathTransform dest_field must not
empty"),
+
+ JSON_PATH_COMPILE_ERROR("JSONPATH_ERROR_CODE-05", "JsonPathTransform path
is invalid"),
+ DEST_TYPE_MUST_NOT_EMPTY(
+ "JSONPATH_ERROR_CODE-06", "JsonPathTransform dest_type must not
empty"),
+ SRC_FIELD_NOT_FOUND(
+ "JSONPATH_ERROR_CODE-02", "JsonPathTransform src_field not found
in source"),
+ ;
+ private final String code;
+ private final String description;
+
+ JsonPathTransformErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
new file mode 100644
index 0000000000..1025a12651
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/ColumnConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.jsonpath;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.Serializable;
+
+public class ColumnConfig implements Serializable {
+ private final String path;
+
+ private final String srcField;
+
+ private final String destField;
+
+ private final SeaTunnelDataType<?> destType;
+
+ public ColumnConfig(
+ String path, String srcField, String destField,
SeaTunnelDataType<?> destType) {
+ this.path = path;
+ this.srcField = srcField;
+ this.destField = destField;
+ this.destType = destType;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getSrcField() {
+ return srcField;
+ }
+
+ public String getDestField() {
+ return destField;
+ }
+
+ public SeaTunnelDataType<?> getDestType() {
+ return destType;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
new file mode 100644
index 0000000000..874d29da8b
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.jsonpath;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.JsonPathException;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.JSON_PATH_COMPILE_ERROR;
+import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_NOT_FOUND;
+
+@Slf4j
+public class JsonPathTransform extends MultipleFieldOutputTransform {
+
+ public static final String PLUGIN_NAME = "JsonPath";
+ private static final Map<String, JsonPath> JSON_PATH_CACHE = new
ConcurrentHashMap<>();
+ private final JsonPathTransformConfig config;
+ private final SeaTunnelRowType seaTunnelRowType;
+
+ private JsonToRowConverters.JsonToRowConverter[] converters;
+ private SeaTunnelRowType outputSeaTunnelRowType;
+
+ private int[] srcFieldIndexArr;
+
+ public JsonPathTransform(JsonPathTransformConfig config, CatalogTable
catalogTable) {
+ super(catalogTable);
+ this.config = config;
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+ init();
+ }
+
+ @Override
+ public String getPluginName() {
+ return PLUGIN_NAME;
+ }
+
+ private void init() {
+
+ initSrcFieldIndexArr();
+ initOutputSeaTunnelRowType();
+ initConverters();
+ }
+
+ private void initConverters() {
+ JsonToRowConverters jsonToRowConverters = new
JsonToRowConverters(false, false);
+ this.converters =
+ this.config.getColumnConfigs().stream()
+ .map(ColumnConfig::getDestType)
+ .map(jsonToRowConverters::createConverter)
+
.toArray(JsonToRowConverters.JsonToRowConverter[]::new);
+ }
+
+ private void initOutputSeaTunnelRowType() {
+
+ SeaTunnelDataType<?>[] dataTypes =
+ this.config.getColumnConfigs().stream()
+ .map(ColumnConfig::getDestType)
+ .toArray(SeaTunnelDataType<?>[]::new);
+ this.outputSeaTunnelRowType =
+ new SeaTunnelRowType(
+ this.config.getColumnConfigs().stream()
+ .map(ColumnConfig::getDestField)
+ .toArray(String[]::new),
+ dataTypes);
+ }
+
+ private void initSrcFieldIndexArr() {
+ List<ColumnConfig> columnConfigs = this.config.getColumnConfigs();
+ Set<String> fieldNameSet = new
HashSet<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
+ this.srcFieldIndexArr = new int[columnConfigs.size()];
+
+ for (int i = 0; i < columnConfigs.size(); i++) {
+ ColumnConfig columnConfig = columnConfigs.get(i);
+ String srcField = columnConfig.getSrcField();
+ if (!fieldNameSet.contains(srcField)) {
+ throw new TransformException(
+ SRC_FIELD_NOT_FOUND,
+ String.format(
+ "JsonPathTransform config not found
src_field:[%s]", srcField));
+ }
+ this.srcFieldIndexArr[i] = seaTunnelRowType.indexOf(srcField);
+ }
+ }
+
+ @Override
+ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+ List<ColumnConfig> configs = this.config.getColumnConfigs();
+ int size = configs.size();
+ Object[] fieldValues = new Object[size];
+ for (int i = 0; i < size; i++) {
+ int pos = this.srcFieldIndexArr[i];
+ fieldValues[i] =
+ doTransform(
+ seaTunnelRowType.getFieldType(pos),
+ inputRow.getField(pos),
+ configs.get(i),
+ converters[i]);
+ }
+ return fieldValues;
+ }
+
+ private Object doTransform(
+ SeaTunnelDataType<?> inputDataType,
+ Object value,
+ ColumnConfig columnConfig,
+ JsonToRowConverters.JsonToRowConverter converter) {
+ if (value == null) {
+ return null;
+ }
+ JSON_PATH_CACHE.computeIfAbsent(columnConfig.getPath(),
JsonPath::compile);
+ String jsonString = "";
+ try {
+ switch (inputDataType.getSqlType()) {
+ case STRING:
+ jsonString = value.toString();
+ break;
+ case BYTES:
+ jsonString = new String((byte[]) value);
+ break;
+ case ARRAY:
+ case MAP:
+ jsonString = JsonUtils.toJsonString(value);
+ break;
+ case ROW:
+ SeaTunnelRow row = (SeaTunnelRow) value;
+ jsonString = JsonUtils.toJsonString(row.getFields());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "JsonPathTransform unsupported sourceDataType: "
+ + inputDataType.getSqlType());
+ }
+ Object result =
JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString);
+ JsonNode jsonNode = JsonUtils.toJsonNode(result);
+ return converter.convert(jsonNode);
+ } catch (JsonPathException e) {
+ throw new TransformException(JSON_PATH_COMPILE_ERROR,
e.getMessage());
+ }
+ }
+
+ @Override
+ protected Column[] getOutputColumns() {
+ int len = this.outputSeaTunnelRowType.getTotalFields();
+ Column[] columns = new Column[len];
+ for (int i = 0; i < len; i++) {
+ String fieldName = this.outputSeaTunnelRowType.getFieldName(i);
+ SeaTunnelDataType<?> fieldType =
this.outputSeaTunnelRowType.getFieldType(i);
+ columns[i] = PhysicalColumn.of(fieldName, fieldType, 200, true,
"", "");
+ }
+ return columns;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
new file mode 100644
index 0000000000..b50f0ed73d
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformConfig.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.jsonpath;
+
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.COLUMNS_MUST_NOT_EMPTY;
+import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.DEST_FIELD_MUST_NOT_EMPTY;
+import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.PATH_MUST_NOT_EMPTY;
+import static
org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_MUST_NOT_EMPTY;
+
+public class JsonPathTransformConfig implements Serializable {
+
+ public static final Option<String> PATH =
+ Options.key("path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("JSONPath for Selecting Field from
JSON.");
+
+ public static final Option<String> SRC_FIELD =
+ Options.key("src_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("JSON source field.");
+
+ public static final Option<String> DEST_FIELD =
+ Options.key("dest_field")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("output field.");
+
+ public static final Option<String> DEST_TYPE =
+ Options.key("dest_type")
+ .stringType()
+ .defaultValue("string")
+ .withDescription("output field type,default string");
+
+ public static final Option<List<Map<String, String>>> COLUMNS =
+ Options.key("columns")
+ .type(new TypeReference<List<Map<String, String>>>() {})
+ .noDefaultValue()
+ .withDescription("columns");
+
+ private final List<ColumnConfig> columnConfigs;
+
+ public List<ColumnConfig> getColumnConfigs() {
+ return columnConfigs;
+ }
+
+ public JsonPathTransformConfig(List<ColumnConfig> columnConfigs) {
+ this.columnConfigs = columnConfigs;
+ }
+
+ public static JsonPathTransformConfig of(ReadonlyConfig config) {
+ if (!config.toConfig().hasPath(COLUMNS.key())) {
+ throw new TransformException(
+ COLUMNS_MUST_NOT_EMPTY,
COLUMNS_MUST_NOT_EMPTY.getErrorMessage());
+ }
+ List<Map<String, String>> columns = config.get(COLUMNS);
+ List<ColumnConfig> configs = new ArrayList<>(columns.size());
+ for (Map<String, String> map : columns) {
+ checkColumnConfig(map);
+ String path = map.get(PATH.key());
+ String srcField = map.get(SRC_FIELD.key());
+ String destField = map.get(DEST_FIELD.key());
+ String type = map.getOrDefault(DEST_TYPE.key(),
DEST_TYPE.defaultValue());
+ SeaTunnelDataType<?> dataType =
+
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(type);
+ ColumnConfig columnConfig = new ColumnConfig(path, srcField,
destField, dataType);
+ configs.add(columnConfig);
+ }
+ return new JsonPathTransformConfig(configs);
+ }
+
+ private static void checkColumnConfig(Map<String, String> map) {
+ String path = map.get(PATH.key());
+ if (StringUtils.isBlank(path)) {
+ throw new TransformException(
+ PATH_MUST_NOT_EMPTY,
PATH_MUST_NOT_EMPTY.getErrorMessage());
+ }
+ String srcField = map.get(SRC_FIELD.key());
+ if (StringUtils.isBlank(srcField)) {
+ throw new TransformException(
+ SRC_FIELD_MUST_NOT_EMPTY,
SRC_FIELD_MUST_NOT_EMPTY.getErrorMessage());
+ }
+ String destField = map.get(DEST_FIELD.key());
+ if (StringUtils.isBlank(destField)) {
+ throw new TransformException(
+ DEST_FIELD_MUST_NOT_EMPTY,
DEST_FIELD_MUST_NOT_EMPTY.getErrorMessage());
+ }
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
new file mode 100644
index 0000000000..796f8dbe6d
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransformFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.jsonpath;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class JsonPathTransformFactory implements TableTransformFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "JsonPath";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return
OptionRule.builder().required(JsonPathTransformConfig.COLUMNS).build();
+ }
+
+ @Override
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
+ ReadonlyConfig options = context.getOptions();
+ JsonPathTransformConfig jsonPathTransformConfig =
JsonPathTransformConfig.of(options);
+ return () -> new JsonPathTransform(jsonPathTransformConfig,
catalogTable);
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
new file mode 100644
index 0000000000..2f1314cace
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/JsonPathTransformFactoryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform;
+
+import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class JsonPathTransformFactoryTest {
+ @Test
+ public void testOptionRule() {
+ JsonPathTransformFactory jsonPathTransformFactory = new
JsonPathTransformFactory();
+ Assertions.assertNotNull(jsonPathTransformFactory.optionRule());
+ }
+}
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index 60aea7fa79..52205f52e4 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -11,6 +11,7 @@ jackson-annotations-2.13.3.jar
jackson-core-2.13.3.jar
jackson-databind-2.13.3.jar
jackson-dataformat-properties-2.13.3.jar
+jackson-datatype-jsr310-2.13.3.jar
jcl-over-slf4j-1.7.25.jar
jcommander-1.81.jar
log4j-api-2.17.1.jar
@@ -34,4 +35,8 @@ j2objc-annotations-1.1.jar
jsr305-1.3.9.jar
jsr305-3.0.0.jar
jsr305-3.0.2.jar
-listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
\ No newline at end of file
+listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar
+json-path-2.7.0.jar
+json-smart-2.4.7.jar
+accessors-smart-2.4.7.jar
+asm-9.1.jar
\ No newline at end of file