This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3298b72f4  [INLONG-4026][Manager] Fix field type of 
StreamSourceFieldMapper (#4027)
3298b72f4 is described below

commit 3298b72f4357eefab94df7f602db66551e6de51a
Author: kipshi <[email protected]>
AuthorDate: Fri Apr 29 18:27:40 2022 +0800

     [INLONG-4026][Manager] Fix field type of StreamSourceFieldMapper (#4027)
    
    * Fix field type of StreamSourceFieldMapper
    
    * change preNode sequence of joinDefinition
    
    * change logic operator
---
 .../java/org/apache/inlong/manager/common/enums/FieldType.java   | 2 +-
 .../inlong/manager/dao/entity/StreamSourceFieldEntity.java       | 4 ++--
 .../inlong/manager/dao/entity/StreamTransformFieldEntity.java    | 4 ++--
 .../inlong/manager/service/sort/util/NodeRelationShipUtils.java  | 9 +++++----
 .../manager/service/transform/StreamTransformServiceImpl.java    | 7 ++++++-
 5 files changed, 16 insertions(+), 10 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
index c67df3f9a..735cf6b62 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java
@@ -48,7 +48,7 @@ public enum FieldType {
     public static FieldType forName(String name) {
         Preconditions.checkNotNull(name, "FieldType should not be null");
         for (FieldType value : values()) {
-            if (value.toString().equals(name) || 
value.toString().equals(name.toLowerCase(Locale.ROOT))) {
+            if (value.toString().equals(name) || 
value.toString().equals(name.toUpperCase(Locale.ROOT))) {
                 return value;
             }
         }
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
index a9d3f9f60..bd845d5d2 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
@@ -43,11 +43,11 @@ public class StreamSourceFieldEntity implements 
Serializable {
 
     private String fieldComment;
 
-    private Short isMetaField;
+    private Integer isMetaField;
 
     private String fieldFormat;
 
-    private Short rankNum;
+    private Integer rankNum;
 
     private Integer isDeleted;
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
index 9fe374b15..d88eea123 100644
--- 
a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
+++ 
b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformFieldEntity.java
@@ -44,11 +44,11 @@ public class StreamTransformFieldEntity implements 
Serializable {
 
     private String fieldComment;
 
-    private Short isMetaField;
+    private Integer isMetaField;
 
     private String fieldFormat;
 
-    private Short rankNum;
+    private Integer rankNum;
 
     private Integer isDeleted;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index ea004bbf1..177850240 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -119,6 +119,7 @@ public class NodeRelationShipUtils {
         JoinMode joinMode = joinerDefinition.getJoinMode();
         String leftNode = getNodeName(joinerDefinition.getLeftNode());
         String rightNode = getNodeName(joinerDefinition.getRightNode());
+        List<String> preNodes = Lists.newArrayList(leftNode, rightNode);
         List<StreamField> leftJoinFields = 
joinerDefinition.getLeftJoinFields();
         List<StreamField> rightJoinFields = 
joinerDefinition.getRightJoinFields();
         List<FilterFunction> filterFunctions = Lists.newArrayList();
@@ -132,19 +133,19 @@ public class NodeRelationShipUtils {
                 operator = EmptyOperator.getInstance();
             }
             filterFunctions.add(
-                    createFilterFunction(leftField, rightField, leftNode, 
rightNode, AndOperator.getInstance()));
+                    createFilterFunction(leftField, rightField, leftNode, 
rightNode, operator));
         }
         Map<String, List<FilterFunction>> joinConditions = Maps.newHashMap();
         joinConditions.put(rightNode, filterFunctions);
         switch (joinMode) {
             case LEFT_JOIN:
-                return new 
LeftOuterJoinNodeRelationShip(nodeRelationShip.getInputs(), 
nodeRelationShip.getOutputs(),
+                return new LeftOuterJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
                         joinConditions);
             case INNER_JOIN:
-                return new 
RightOuterJoinNodeRelationShip(nodeRelationShip.getInputs(), 
nodeRelationShip.getOutputs(),
+                return new RightOuterJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
                         joinConditions);
             case RIGHT_JOIN:
-                return new 
InnerJoinNodeRelationShip(nodeRelationShip.getInputs(), 
nodeRelationShip.getOutputs(),
+                return new InnerJoinNodeRelationShip(preNodes, 
nodeRelationShip.getOutputs(),
                         joinConditions);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
join mode=%s for inlong", joinMode));
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 9d3a31b2a..24befb30d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.FieldType;
 import org.apache.inlong.manager.common.enums.GlobalConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.stream.StreamField;
@@ -109,6 +110,8 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
                 .map(transformFieldEntity -> {
                     StreamField fieldInfo = 
CommonBeanUtils.copyProperties(transformFieldEntity,
                             StreamField::new);
+                    
fieldInfo.setFieldType(FieldType.forName(transformFieldEntity.getFieldType()));
+                    
fieldInfo.setId(Integer.valueOf(transformFieldEntity.getRankNum()));
                     return Pair.of(transformFieldEntity.getTransformId(), 
fieldInfo);
                 }).collect(Collectors.groupingBy(Pair::getLeft,
                         Collectors.mapping(Pair::getRight, 
Collectors.toList())));
@@ -216,12 +219,14 @@ public class StreamTransformServiceImpl implements 
StreamTransformService {
             if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
                 fieldEntity.setFieldComment(fieldEntity.getFieldName());
             }
+            fieldEntity.setId(null);
             fieldEntity.setInlongGroupId(groupId);
             fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setFieldType(fieldInfo.getFieldType().name());
+            fieldEntity.setRankNum(fieldInfo.getId());
             fieldEntity.setTransformId(transformId);
             fieldEntity.setTransformType(transformType);
             fieldEntity.setIsDeleted(GlobalConstants.UN_DELETED);
-            fieldEntity.setOriginNodeName(fieldInfo.getOriginNodeName());
             entityList.add(fieldEntity);
         }
 

Reply via email to