This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 937b3d69 [Feature][Transform] add json path transform (#247)
937b3d69 is described below

commit 937b3d69051c5579b950efc9e1c213ce5c73fc81
Author: KawYang <[email protected]>
AuthorDate: Wed Dec 18 15:29:32 2024 +0800

    [Feature][Transform] add json path transform (#247)
---
 .../transform/{Transform.java => JsonPath.java}    |  27 +++++++----
 ...ransform.java => JsonPathTransformOptions.java} |  14 +++---
 .../domain/request/job/transform/Transform.java    |   3 +-
 .../app/service/impl/ConnectorServiceImpl.java     |   4 +-
 .../app/service/impl/JobTaskServiceImpl.java       |   2 +
 .../transfrom/impl/JsonPathTransformSwitcher.java  |  51 +++++++++++++++++++++
 .../seatunnel/app/utils/TaskOptionUtils.java       |   4 ++
 .../synchronization-definition/dag/canvas/node.tsx |   3 ++
 .../dag/images/json-path.png                       | Bin 0 -> 2878 bytes
 .../dag/sidebar/index.tsx                          |   3 ++
 .../synchronization-definition/dag/use-model.ts    |  26 ++++++++++-
 .../dag/use-node-setting.ts                        |  12 +++++
 12 files changed, 129 insertions(+), 20 deletions(-)

diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java
similarity index 68%
copy from 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
copy to 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java
index 3671a7f7..4989061a 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java
@@ -17,12 +17,23 @@
 
 package org.apache.seatunnel.app.domain.request.job.transform;
 
-public enum Transform {
-    REPLACE,
-    COPY,
-    MULTIFIELDSPLIT,
-    FIELDMAPPER,
-    FILTERROWKIND,
-    SPLIT,
-    SQL
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class JsonPath extends TransformOption {
+
+    private List<JsonPathColumn> columns;
+}
+
+@Data
+class JsonPathColumn {
+    private String src_field;
+    private String path;
+    private String destField;
+    private String destType;
+    private String columnErrorHandleWay;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java
similarity index 87%
copy from 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
copy to 
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java
index 3671a7f7..cf9fa672 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java
@@ -17,12 +17,10 @@
 
 package org.apache.seatunnel.app.domain.request.job.transform;
 
-public enum Transform {
-    REPLACE,
-    COPY,
-    MULTIFIELDSPLIT,
-    FIELDMAPPER,
-    FILTERROWKIND,
-    SPLIT,
-    SQL
+import lombok.Data;
+
+@Data
+public class JsonPathTransformOptions implements TransformOptions {
+
+    private JsonPath jsonPath;
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
index 3671a7f7..a2b2cf3c 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
@@ -24,5 +24,6 @@ public enum Transform {
     FIELDMAPPER,
     FILTERROWKIND,
     SPLIT,
-    SQL
+    SQL,
+    JSONPATH
 }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
index 307066a2..cc9f538c 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
@@ -123,6 +123,7 @@ public class ConnectorServiceImpl extends 
SeatunnelBaseServiceImpl implements IC
                                 .toUpperCase());
 
         if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+
             return connectorCache.getTransform().stream()
                     .filter(
                             connectorInfo -> {
@@ -133,7 +134,8 @@ public class ConnectorServiceImpl extends 
SeatunnelBaseServiceImpl implements IC
                                         || pluginName.equals("Replace")
                                         || pluginName.equals("Copy")
                                         || pluginName.equals("MultiFieldSplit")
-                                        || pluginName.equals("Sql");
+                                        || pluginName.equals("Sql")
+                                        || pluginName.equals("JsonPath");
                             })
                     .collect(Collectors.toList());
         }
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
index 107f7fb9..c5cef317 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
@@ -350,6 +350,8 @@ public class JobTaskServiceImpl extends 
SeatunnelBaseServiceImpl implements IJob
                                 
Collections.singletonList(sqlTransformOptions.getSql()));
                     }
                     break;
+                case JSONPATH:
+                    break;
                 case FILTERROWKIND:
                 case REPLACE:
                 default:
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java
new file mode 100644
index 00000000..09579b6c
--- /dev/null
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thirdparty.transfrom.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.transform.Transform;
+import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import 
org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;
+import org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcher;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TransformConfigSwitcher.class)
+public class JsonPathTransformSwitcher implements TransformConfigSwitcher {
+    @Override
+    public Transform getTransform() {
+        return Transform.JSONPATH;
+    }
+
+    @Override
+    public FormStructure getFormStructure(OptionRule transformOptionRule) {
+        return SeaTunnelOptionRuleWrapper.wrapper(transformOptionRule, 
this.getTransform().name());
+    }
+
+    @Override
+    public Config mergeTransformConfig(
+            Config transformConfig, TransformOptions transformOption, 
TableSchemaReq inputSchema) {
+        return transformConfig;
+    }
+}
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
index 02cc62ad..57906787 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.app.utils;
 
 import 
org.apache.seatunnel.app.domain.request.job.transform.CopyTransformOptions;
 import 
org.apache.seatunnel.app.domain.request.job.transform.FieldMapperTransformOptions;
+import 
org.apache.seatunnel.app.domain.request.job.transform.JsonPathTransformOptions;
 import 
org.apache.seatunnel.app.domain.request.job.transform.SQLTransformOptions;
 import 
org.apache.seatunnel.app.domain.request.job.transform.SplitTransformOptions;
 import org.apache.seatunnel.app.domain.request.job.transform.Transform;
@@ -47,6 +48,9 @@ public class TaskOptionUtils {
                         transformOptionsStr, CopyTransformOptions.class);
             case SQL:
                 return convertTransformStrToOptions(transformOptionsStr, 
SQLTransformOptions.class);
+            case JSONPATH:
+                return convertTransformStrToOptions(
+                        transformOptionsStr, JsonPathTransformOptions.class);
             case FILTERROWKIND:
             case REPLACE:
             default:
diff --git 
a/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx
index e875761d..998759ca 100644
--- a/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx
+++ b/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx
@@ -23,6 +23,7 @@ import SinkImg from '../images/sink.png'
 import FieldMapperImg from '../images/field-mapper.png'
 import FilterEventTypeImg from '../images/filter-event-type.png'
 import ReplaceImg from '../images/replace.png'
+import JsonPathImg from '../images/json-path.png'
 import SplitImg from '../images/spilt.png'
 import CopyImg from '../images/copy.png'
 import SqlImg from '../images/sql.png'
@@ -52,6 +53,8 @@ const Node = defineComponent({
       icon.value = CopyImg
     } else if (type === 'transform' && connectorType === 'Sql') {
       icon.value = SqlImg
+    } else if (type === 'transform' && connectorType === 'JsonPath') {
+      icon.value = JsonPathImg
     }
 
     return () => (
diff --git 
a/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png
 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png
new file mode 100644
index 00000000..b42f69ff
Binary files /dev/null and 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png
 differ
diff --git 
a/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx
index 79aff2c3..9f843458 100644
--- 
a/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx
+++ 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx
@@ -25,6 +25,7 @@ import SinkImg from '../images/sink.png'
 import FieldMapperImg from '../images/field-mapper.png'
 import FilterEventTypeImg from '../images/filter-event-type.png'
 import ReplaceImg from '../images/replace.png'
+import JsonPathImg from '../images/json-path.png'
 import SplitImg from '../images/spilt.png'
 import CopyImg from '../images/copy.png'
 import SqlImg from '../images/sql.png'
@@ -112,6 +113,8 @@ const DagSidebar = defineComponent({
                 item.icon = CopyImg
               } else if (item.name === 'Sql') {
                 item.icon = SqlImg
+              } else if (item.name === 'JsonPath') {
+                item.icon = JsonPathImg
               }
 
               return (
diff --git 
a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts
index 3d89eaa6..48497adf 100644
--- a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts
+++ b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts
@@ -263,8 +263,30 @@ export function useNodeModel(
           } else if(transformType === 'Sql'){
             let table = await getSqlTransformOutputData()
             state.outputTableData = table
-           
-          } else {
+          } else if(transformType === 'JsonPath'){
+            // extract the dest_field in columns
+            const fixedInput = refForm.value.getValues().columns.replace(/=/g, 
':');
+            const destFieldMatches = 
fixedInput.match(/"dest_field"\s*:\s*"([^"]*)"/g);
+            const destFields = destFieldMatches.map(match => 
match.match(/"([^"]+)"$/)[1]);
+
+            state.outputTableData = state.inputTableData.map(item => ({ 
...item }));
+            destFields.map(item => {
+              state.outputTableData.push(
+                {
+                  "format":"",
+                  "type": "TEXT",
+                  "name": item,
+                  "comment": "",
+                  "primaryKey": false,
+                  "defaultValue": '',
+                  "nullable": false,
+                  "unSupport": false,
+                  "outputDataType": "TEXT"
+              }
+              )
+            });
+          }
+          else {
             state.outputTableData = t.fields
           }
         }
diff --git 
a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
index 62f1b7dd..e115758f 100644
--- 
a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
+++ 
b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
@@ -241,6 +241,18 @@ export function useNodeSettingModal(
           tableName: tableInfo[0].tableName,
           fields: resultSchema.outputTableData
         }]
+      }else if(props.nodeInfo.connectorType === 'JsonPath') {
+        const resultSchema = modelRef.value.getOutputSchema()
+        const tableInfo = resultSchema.allTableData[0].tableInfos
+        transformOptions.columns = {
+          "sourceFieldName": resultSchema.outputTableData[0]?.name || null,
+          "columns": values.columns
+        }
+        modelOutputTableData = [{
+          database: tableInfo[0].database,
+          tableName: tableInfo[0].tableName,
+          fields: resultSchema.outputTableData
+        }]
       }
 
       await saveTaskDefinitionItem(

Reply via email to