This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f6db67cf7 [INLONG-4110][Manager] Create cascade function wrapper 
(#4111)
f6db67cf7 is described below

commit f6db67cf7f53ea3161d5756c296f3f89eb4ea678
Author: kipshi <[email protected]>
AuthorDate: Sat May 7 20:30:53 2022 +0800

    [INLONG-4110][Manager] Create cascade function wrapper (#4111)
---
 .../service/sort/util/FieldRelationShipUtils.java  | 24 ++++++++++++++++++++++
 .../service/sort/util/NodeRelationShipUtils.java   |  4 ++--
 .../transform/StreamTransformServiceImpl.java      |  2 +-
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
index 08de0625e..59d498c54 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.sort.util;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.TransformType;
@@ -33,14 +34,17 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.util.StreamParseUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.CascadeFunction;
 import org.apache.inlong.sort.protocol.transformation.ConstantParam;
 import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import 
org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
 import 
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction;
 import 
org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -126,6 +130,7 @@ public class FieldRelationShipUtils {
         List<FieldRelationShip> fieldRelationShips = replaceRules.stream()
                 .map(replaceRule -> parseReplaceRule(replaceRule, 
replaceFields, transformName, preNode))
                 .collect(Collectors.toList());
+        fieldRelationShips = cascadeFunctionRelationShips(fieldRelationShips);
         List<StreamField> filteredFieldList = fieldList.stream()
                 .filter(streamFieldInfo -> 
!replaceFields.contains(streamFieldInfo.getFieldName()))
                 .collect(Collectors.toList());
@@ -133,6 +138,25 @@ public class FieldRelationShipUtils {
         return fieldRelationShips;
     }
 
+    private static List<FieldRelationShip> 
cascadeFunctionRelationShips(List<FieldRelationShip> fieldRelationShips) {
+        Map<String, List<CascadeFunction>> cascadeFunctions = 
Maps.newHashMap();
+        Map<String, FieldInfo> targetFields = Maps.newHashMap();
+        for (FieldRelationShip fieldRelationShip : fieldRelationShips) {
+            CascadeFunction cascadeFunction = (CascadeFunction) 
fieldRelationShip.getInputField();
+            String targetField = fieldRelationShip.getOutputField().getName();
+            cascadeFunctions.computeIfAbsent(targetField, k -> 
Lists.newArrayList()).add(cascadeFunction);
+            targetFields.put(targetField, fieldRelationShip.getOutputField());
+        }
+        List<FieldRelationShip> cascadeRelationShips = Lists.newArrayList();
+        for (Map.Entry<String, List<CascadeFunction>> entry : 
cascadeFunctions.entrySet()) {
+            String targetField = entry.getKey();
+            CascadeFunctionWrapper functionWrapper = new 
CascadeFunctionWrapper(entry.getValue());
+            FieldInfo targetFieldInfo = targetFields.get(targetField);
+            cascadeRelationShips.add(new FieldRelationShip(functionWrapper, 
targetFieldInfo));
+        }
+        return cascadeRelationShips;
+    }
+
     private static FieldRelationShip parseReplaceRule(ReplaceRule replaceRule, 
Set<String> replaceFields,
             String transformName, String preNode) {
         StreamField sourceField = replaceRule.getSourceField();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index c62c78bea..444680ab1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -142,10 +142,10 @@ public class NodeRelationShipUtils {
                 return new LeftOuterJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
                         joinConditions);
             case INNER_JOIN:
-                return new RightOuterJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
+                return new InnerJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
                         joinConditions);
             case RIGHT_JOIN:
-                return new InnerJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
+                return new RightOuterJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
                         joinConditions);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
join mode=%s for inlong", joinMode));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 24befb30d..c2ffb9781 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -151,7 +151,7 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = 
Propagation.REQUIRES_NEW)
     public boolean delete(String groupId, String streamId, String 
transformName, String operator) {
-        log.info("begin to delete source by groupId={} streamId={}, 
transformName={}", groupId, streamId,
+        log.info("begin to delete transform by groupId={} streamId={}, 
transformName={}", groupId, streamId,
                 transformName);
         Preconditions.checkNotNull(groupId, 
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         Preconditions.checkNotNull(streamId, 
ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());

Reply via email to