This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f6db67cf7 [INLONG-4110][Manager] Create cascade function wrapper
(#4111)
f6db67cf7 is described below
commit f6db67cf7f53ea3161d5756c296f3f89eb4ea678
Author: kipshi <[email protected]>
AuthorDate: Sat May 7 20:30:53 2022 +0800
[INLONG-4110][Manager] Create cascade function wrapper (#4111)
---
.../service/sort/util/FieldRelationShipUtils.java | 24 ++++++++++++++++++++++
.../service/sort/util/NodeRelationShipUtils.java | 4 ++--
.../transform/StreamTransformServiceImpl.java | 2 +-
3 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
index 08de0625e..59d498c54 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldRelationShipUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.sort.util;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.inlong.manager.common.enums.FieldType;
import org.apache.inlong.manager.common.enums.TransformType;
@@ -33,14 +34,17 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.util.StreamParseUtils;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.CascadeFunction;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import
org.apache.inlong.sort.protocol.transformation.function.CascadeFunctionWrapper;
import
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFirstFunction;
import
org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction;
import
org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunction;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -126,6 +130,7 @@ public class FieldRelationShipUtils {
List<FieldRelationShip> fieldRelationShips = replaceRules.stream()
.map(replaceRule -> parseReplaceRule(replaceRule,
replaceFields, transformName, preNode))
.collect(Collectors.toList());
+ fieldRelationShips = cascadeFunctionRelationShips(fieldRelationShips);
List<StreamField> filteredFieldList = fieldList.stream()
.filter(streamFieldInfo ->
!replaceFields.contains(streamFieldInfo.getFieldName()))
.collect(Collectors.toList());
@@ -133,6 +138,25 @@ public class FieldRelationShipUtils {
return fieldRelationShips;
}
+ private static List<FieldRelationShip>
cascadeFunctionRelationShips(List<FieldRelationShip> fieldRelationShips) {
+ Map<String, List<CascadeFunction>> cascadeFunctions =
Maps.newHashMap();
+ Map<String, FieldInfo> targetFields = Maps.newHashMap();
+ for (FieldRelationShip fieldRelationShip : fieldRelationShips) {
+ CascadeFunction cascadeFunction = (CascadeFunction)
fieldRelationShip.getInputField();
+ String targetField = fieldRelationShip.getOutputField().getName();
+ cascadeFunctions.computeIfAbsent(targetField, k ->
Lists.newArrayList()).add(cascadeFunction);
+ targetFields.put(targetField, fieldRelationShip.getOutputField());
+ }
+ List<FieldRelationShip> cascadeRelationShips = Lists.newArrayList();
+ for (Map.Entry<String, List<CascadeFunction>> entry :
cascadeFunctions.entrySet()) {
+ String targetField = entry.getKey();
+ CascadeFunctionWrapper functionWrapper = new
CascadeFunctionWrapper(entry.getValue());
+ FieldInfo targetFieldInfo = targetFields.get(targetField);
+ cascadeRelationShips.add(new FieldRelationShip(functionWrapper,
targetFieldInfo));
+ }
+ return cascadeRelationShips;
+ }
+
private static FieldRelationShip parseReplaceRule(ReplaceRule replaceRule,
Set<String> replaceFields,
String transformName, String preNode) {
StreamField sourceField = replaceRule.getSourceField();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index c62c78bea..444680ab1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -142,10 +142,10 @@ public class NodeRelationShipUtils {
return new LeftOuterJoinNodeRelationShip(preNodes,
nodeRelationShip.getOutputs(),
joinConditions);
case INNER_JOIN:
- return new RightOuterJoinNodeRelationShip(preNodes,
nodeRelationShip.getOutputs(),
+ return new InnerJoinNodeRelationShip(preNodes,
nodeRelationShip.getOutputs(),
joinConditions);
case RIGHT_JOIN:
- return new InnerJoinNodeRelationShip(preNodes,
nodeRelationShip.getOutputs(),
+ return new RightOuterJoinNodeRelationShip(preNodes,
nodeRelationShip.getOutputs(),
joinConditions);
default:
throw new IllegalArgumentException(String.format("Unsupported
join mode=%s for inlong", joinMode));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 24befb30d..c2ffb9781 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -151,7 +151,7 @@ public class StreamTransformServiceImpl implements
StreamTransformService {
@Override
@Transactional(rollbackFor = Throwable.class, propagation =
Propagation.REQUIRES_NEW)
public boolean delete(String groupId, String streamId, String
transformName, String operator) {
- log.info("begin to delete source by groupId={} streamId={},
transformName={}", groupId, streamId,
+ log.info("begin to delete transform by groupId={} streamId={},
transformName={}", groupId, streamId,
transformName);
Preconditions.checkNotNull(groupId,
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
Preconditions.checkNotNull(streamId,
ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());