This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 937b3d69 [Feature][Transform] add json path transform (#247)
937b3d69 is described below
commit 937b3d69051c5579b950efc9e1c213ce5c73fc81
Author: KawYang <[email protected]>
AuthorDate: Wed Dec 18 15:29:32 2024 +0800
[Feature][Transform] add json path transform (#247)
---
.../transform/{Transform.java => JsonPath.java} | 27 +++++++----
...ransform.java => JsonPathTransformOptions.java} | 14 +++---
.../domain/request/job/transform/Transform.java | 3 +-
.../app/service/impl/ConnectorServiceImpl.java | 4 +-
.../app/service/impl/JobTaskServiceImpl.java | 2 +
.../transfrom/impl/JsonPathTransformSwitcher.java | 51 +++++++++++++++++++++
.../seatunnel/app/utils/TaskOptionUtils.java | 4 ++
.../synchronization-definition/dag/canvas/node.tsx | 3 ++
.../dag/images/json-path.png | Bin 0 -> 2878 bytes
.../dag/sidebar/index.tsx | 3 ++
.../synchronization-definition/dag/use-model.ts | 26 ++++++++++-
.../dag/use-node-setting.ts | 12 +++++
12 files changed, 129 insertions(+), 20 deletions(-)
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java
similarity index 68%
copy from
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
copy to
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java
index 3671a7f7..4989061a 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPath.java
@@ -17,12 +17,23 @@
package org.apache.seatunnel.app.domain.request.job.transform;
-public enum Transform {
- REPLACE,
- COPY,
- MULTIFIELDSPLIT,
- FIELDMAPPER,
- FILTERROWKIND,
- SPLIT,
- SQL
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.List;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class JsonPath extends TransformOption {
+
+ private List<JsonPathColumn> columns;
+}
+
+@Data
+class JsonPathColumn {
+ private String src_field;
+ private String path;
+ private String destField;
+ private String destType;
+ private String columnErrorHandleWay;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java
similarity index 87%
copy from
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
copy to
seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java
index 3671a7f7..cf9fa672 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/JsonPathTransformOptions.java
@@ -17,12 +17,10 @@
package org.apache.seatunnel.app.domain.request.job.transform;
-public enum Transform {
- REPLACE,
- COPY,
- MULTIFIELDSPLIT,
- FIELDMAPPER,
- FILTERROWKIND,
- SPLIT,
- SQL
+import lombok.Data;
+
+@Data
+public class JsonPathTransformOptions implements TransformOptions {
+
+ private JsonPath jsonPath;
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
index 3671a7f7..a2b2cf3c 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/domain/request/job/transform/Transform.java
@@ -24,5 +24,6 @@ public enum Transform {
FIELDMAPPER,
FILTERROWKIND,
SPLIT,
- SQL
+ SQL,
+ JSONPATH
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
index 307066a2..cc9f538c 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/ConnectorServiceImpl.java
@@ -123,6 +123,7 @@ public class ConnectorServiceImpl extends
SeatunnelBaseServiceImpl implements IC
.toUpperCase());
if (businessMode.equals(BusinessMode.DATA_INTEGRATION)) {
+
return connectorCache.getTransform().stream()
.filter(
connectorInfo -> {
@@ -133,7 +134,8 @@ public class ConnectorServiceImpl extends
SeatunnelBaseServiceImpl implements IC
|| pluginName.equals("Replace")
|| pluginName.equals("Copy")
|| pluginName.equals("MultiFieldSplit")
- || pluginName.equals("Sql");
+ || pluginName.equals("Sql")
+ || pluginName.equals("JsonPath");
})
.collect(Collectors.toList());
}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
index 107f7fb9..c5cef317 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java
@@ -350,6 +350,8 @@ public class JobTaskServiceImpl extends
SeatunnelBaseServiceImpl implements IJob
Collections.singletonList(sqlTransformOptions.getSql()));
}
break;
+ case JSONPATH:
+ break;
case FILTERROWKIND:
case REPLACE:
default:
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java
new file mode 100644
index 00000000..09579b6c
--- /dev/null
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/transfrom/impl/JsonPathTransformSwitcher.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.app.thirdparty.transfrom.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.app.domain.request.job.TableSchemaReq;
+import org.apache.seatunnel.app.domain.request.job.transform.Transform;
+import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
+import org.apache.seatunnel.app.dynamicforms.FormStructure;
+import
org.apache.seatunnel.app.thirdparty.framework.SeaTunnelOptionRuleWrapper;
+import org.apache.seatunnel.app.thirdparty.transfrom.TransformConfigSwitcher;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(TransformConfigSwitcher.class)
+public class JsonPathTransformSwitcher implements TransformConfigSwitcher {
+ @Override
+ public Transform getTransform() {
+ return Transform.JSONPATH;
+ }
+
+ @Override
+ public FormStructure getFormStructure(OptionRule transformOptionRule) {
+ return SeaTunnelOptionRuleWrapper.wrapper(transformOptionRule,
this.getTransform().name());
+ }
+
+ @Override
+ public Config mergeTransformConfig(
+ Config transformConfig, TransformOptions transformOption,
TableSchemaReq inputSchema) {
+ return transformConfig;
+ }
+}
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
index 02cc62ad..57906787 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/TaskOptionUtils.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.app.utils;
import
org.apache.seatunnel.app.domain.request.job.transform.CopyTransformOptions;
import
org.apache.seatunnel.app.domain.request.job.transform.FieldMapperTransformOptions;
+import
org.apache.seatunnel.app.domain.request.job.transform.JsonPathTransformOptions;
import
org.apache.seatunnel.app.domain.request.job.transform.SQLTransformOptions;
import
org.apache.seatunnel.app.domain.request.job.transform.SplitTransformOptions;
import org.apache.seatunnel.app.domain.request.job.transform.Transform;
@@ -47,6 +48,9 @@ public class TaskOptionUtils {
transformOptionsStr, CopyTransformOptions.class);
case SQL:
return convertTransformStrToOptions(transformOptionsStr,
SQLTransformOptions.class);
+ case JSONPATH:
+ return convertTransformStrToOptions(
+ transformOptionsStr, JsonPathTransformOptions.class);
case FILTERROWKIND:
case REPLACE:
default:
diff --git
a/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx
b/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx
index e875761d..998759ca 100644
--- a/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx
+++ b/seatunnel-ui/src/views/task/synchronization-definition/dag/canvas/node.tsx
@@ -23,6 +23,7 @@ import SinkImg from '../images/sink.png'
import FieldMapperImg from '../images/field-mapper.png'
import FilterEventTypeImg from '../images/filter-event-type.png'
import ReplaceImg from '../images/replace.png'
+import JsonPathImg from '../images/json-path.png'
import SplitImg from '../images/spilt.png'
import CopyImg from '../images/copy.png'
import SqlImg from '../images/sql.png'
@@ -52,6 +53,8 @@ const Node = defineComponent({
icon.value = CopyImg
} else if (type === 'transform' && connectorType === 'Sql') {
icon.value = SqlImg
+ } else if (type === 'transform' && connectorType === 'JsonPath') {
+ icon.value = JsonPathImg
}
return () => (
diff --git
a/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png
b/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png
new file mode 100644
index 00000000..b42f69ff
Binary files /dev/null and
b/seatunnel-ui/src/views/task/synchronization-definition/dag/images/json-path.png
differ
diff --git
a/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx
b/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx
index 79aff2c3..9f843458 100644
---
a/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx
+++
b/seatunnel-ui/src/views/task/synchronization-definition/dag/sidebar/index.tsx
@@ -25,6 +25,7 @@ import SinkImg from '../images/sink.png'
import FieldMapperImg from '../images/field-mapper.png'
import FilterEventTypeImg from '../images/filter-event-type.png'
import ReplaceImg from '../images/replace.png'
+import JsonPathImg from '../images/json-path.png'
import SplitImg from '../images/spilt.png'
import CopyImg from '../images/copy.png'
import SqlImg from '../images/sql.png'
@@ -112,6 +113,8 @@ const DagSidebar = defineComponent({
item.icon = CopyImg
} else if (item.name === 'Sql') {
item.icon = SqlImg
+ } else if (item.name === 'JsonPath') {
+ item.icon = JsonPathImg
}
return (
diff --git
a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts
b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts
index 3d89eaa6..48497adf 100644
--- a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts
+++ b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-model.ts
@@ -263,8 +263,30 @@ export function useNodeModel(
} else if(transformType === 'Sql'){
let table = await getSqlTransformOutputData()
state.outputTableData = table
-
- } else {
+ } else if(transformType === 'JsonPath'){
+ // extract the dest_field in columns
+ const fixedInput = refForm.value.getValues().columns.replace(/=/g,
':');
+ const destFieldMatches =
fixedInput.match(/"dest_field"\s*:\s*"([^"]*)"/g);
+ const destFields = destFieldMatches.map(match =>
match.match(/"([^"]+)"$/)[1]);
+
+ state.outputTableData = state.inputTableData.map(item => ({
...item }));
+ destFields.map(item => {
+ state.outputTableData.push(
+ {
+ "format":"",
+ "type": "TEXT",
+ "name": item,
+ "comment": "",
+ "primaryKey": false,
+ "defaultValue": '',
+ "nullable": false,
+ "unSupport": false,
+ "outputDataType": "TEXT"
+ }
+ )
+ });
+ }
+ else {
state.outputTableData = t.fields
}
}
diff --git
a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
index 62f1b7dd..e115758f 100644
---
a/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
+++
b/seatunnel-ui/src/views/task/synchronization-definition/dag/use-node-setting.ts
@@ -241,6 +241,18 @@ export function useNodeSettingModal(
tableName: tableInfo[0].tableName,
fields: resultSchema.outputTableData
}]
+ }else if(props.nodeInfo.connectorType === 'JsonPath') {
+ const resultSchema = modelRef.value.getOutputSchema()
+ const tableInfo = resultSchema.allTableData[0].tableInfos
+ transformOptions.columns = {
+ "sourceFieldName": resultSchema.outputTableData[0]?.name || null,
+ "columns": values.columns
+ }
+ modelOutputTableData = [{
+ database: tableInfo[0].database,
+ tableName: tableInfo[0].tableName,
+ fields: resultSchema.outputTableData
+ }]
}
await saveTaskDefinitionItem(