This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c352a37cd [INLONG-4046][Manager] Fix Filter Function of Join node 
(#4047)
c352a37cd is described below

commit c352a37cd6a9601ec64fd86f700be1e7919ee452
Author: kipshi <[email protected]>
AuthorDate: Sun May 1 11:44:58 2022 +0800

    [INLONG-4046][Manager] Fix Filter Function of Join node (#4047)
    
    * Fix Filter Function of Join node
    
    * change append mode of MysqlCdcConnector
    
    * optimize method FieldInfoUtils.getFieldInfo
    
    * Add constantParam when targetValue is null
    
    * Add TODO comment
    
    Co-authored-by: healchow <[email protected]>
---
 .../manager/service/sort/util/ExtractNodeUtils.java  | 20 +++++++++++---------
 .../manager/service/sort/util/FieldInfoUtils.java    | 16 +++++++++++++---
 .../service/sort/util/FilterFunctionUtils.java       |  3 +++
 .../service/sort/util/NodeRelationShipUtils.java     |  8 ++++----
 4 files changed, 31 insertions(+), 16 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 17cd5aecb..ab5d1a0e3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -79,14 +79,14 @@ public class ExtractNodeUtils {
      * @return
      */
     public static MySqlExtractNode createExtractNode(BinlogSourceResponse 
binlogSourceResponse) {
-        String id = binlogSourceResponse.getSourceName();
-        String name = binlogSourceResponse.getSourceName();
-        String database = binlogSourceResponse.getDatabaseWhiteList();
-        String primaryKey = binlogSourceResponse.getPrimaryKey();
-        String hostName = binlogSourceResponse.getHostname();
-        String userName = binlogSourceResponse.getUser();
-        String password = binlogSourceResponse.getPassword();
-        Integer port = binlogSourceResponse.getPort();
+        final String id = binlogSourceResponse.getSourceName();
+        final String name = binlogSourceResponse.getSourceName();
+        final String database = binlogSourceResponse.getDatabaseWhiteList();
+        final String primaryKey = binlogSourceResponse.getPrimaryKey();
+        final String hostName = binlogSourceResponse.getHostname();
+        final String userName = binlogSourceResponse.getUser();
+        final String password = binlogSourceResponse.getPassword();
+        final Integer port = binlogSourceResponse.getPort();
         Integer serverId = null;
         if (binlogSourceResponse.getServerId() != null && 
binlogSourceResponse.getServerId() > 0) {
             serverId = binlogSourceResponse.getServerId();
@@ -99,13 +99,15 @@ public class ExtractNodeUtils {
                 .collect(Collectors.toList());
         String serverTimeZone = binlogSourceResponse.getServerTimezone();
         boolean incrementalSnapshotEnabled = true;
+        
+        // TODO Needs to be configurable for those parameters
         Map<String, String> properties = Maps.newHashMap();
         if (binlogSourceResponse.isAllMigration()) {
             // Unique properties when migrate all tables in database
             incrementalSnapshotEnabled = false;
-            properties.put("append-mode", "true");
             properties.put("migrate-all", "true");
         }
+        properties.put("append-mode", "true");
         return new MySqlExtractNode(id,
                 name,
                 fieldInfos,
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index e6939d9e0..6538dd5c6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -134,14 +134,24 @@ public class FieldInfoUtils {
         BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
         FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase(), 
format);
         if (isBuiltin && builtInField != null) {
-            fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo, 
builtInField);
+            return new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
         } else {
             if (isBuiltin) {
+                // Check if fieldName contains buildInFieldName, such as 
left_database
+                // TODO The buildin field needs to be selectable and cannot be 
filled in by the user
+                for (String buildInFieldName : BUILT_IN_FIELD_MAP.keySet()) {
+                    if (fieldName.contains(buildInFieldName)) {
+                        builtInField = 
BUILT_IN_FIELD_MAP.get(buildInFieldName);
+                        break;
+                    }
+                }
+                if (builtInField != null) {
+                    return new BuiltInFieldInfo(fieldName, formatInfo, 
builtInField);
+                }
                 log.warn("Unsupported metadata fieldName={} as the 
builtInField is null", fieldName);
             }
-            fieldInfo = new FieldInfo(fieldName, formatInfo);
+            return new FieldInfo(fieldName, formatInfo);
         }
-        return fieldInfo;
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
index bb24eb87d..16f63bd69 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
@@ -122,6 +122,9 @@ public class FilterFunctionUtils {
     }
 
     private static FunctionParam parseTargetValue(TargetValue value, String 
transformName) {
+        if (value == null) {
+            return new ConstantParam("");
+        }
         boolean isConstant = value.isConstant();
         if (isConstant) {
             String constant = value.getTargetConstant();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index 177850240..c62c78bea 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -133,7 +133,7 @@ public class NodeRelationShipUtils {
                 operator = EmptyOperator.getInstance();
             }
             filterFunctions.add(
-                    createFilterFunction(leftField, rightField, leftNode, 
rightNode, operator));
+                    createFilterFunction(leftField, rightField, operator));
         }
         Map<String, List<FilterFunction>> joinConditions = Maps.newHashMap();
         joinConditions.put(rightNode, filterFunctions);
@@ -153,10 +153,10 @@ public class NodeRelationShipUtils {
     }
 
     private static SingleValueFilterFunction createFilterFunction(StreamField 
leftField, StreamField rightField,
-            String leftNode, String rightNode, LogicOperator operator) {
-        FieldInfo sourceField = new FieldInfo(leftField.getFieldName(), 
leftNode,
+            LogicOperator operator) {
+        FieldInfo sourceField = new FieldInfo(leftField.getOriginFieldName(), 
leftField.getOriginNodeName(),
                 
FieldInfoUtils.convertFieldFormat(leftField.getFieldType().name(), 
leftField.getFieldFormat()));
-        FieldInfo targetField = new FieldInfo(rightField.getFieldName(), 
rightNode,
+        FieldInfo targetField = new FieldInfo(rightField.getOriginFieldName(), 
rightField.getOriginNodeName(),
                 
FieldInfoUtils.convertFieldFormat(rightField.getFieldType().name(), 
rightField.getFieldFormat()));
         return new SingleValueFilterFunction(operator, sourceField, 
EqualOperator.getInstance(), targetField);
     }

Reply via email to