This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c352a37cd [INLONG-4046][Manager] Fix Filter Function of Join node
(#4047)
c352a37cd is described below
commit c352a37cd6a9601ec64fd86f700be1e7919ee452
Author: kipshi <[email protected]>
AuthorDate: Sun May 1 11:44:58 2022 +0800
[INLONG-4046][Manager] Fix Filter Function of Join node (#4047)
* Fix Filter Function of Join node
* change append mode of MysqlCdcConnector
* optimize method FieldInfoUtils.getFieldInfo
* Add constantParam when targetValue is null
* Add TODO comment
Co-authored-by: healchow <[email protected]>
---
.../manager/service/sort/util/ExtractNodeUtils.java | 20 +++++++++++---------
.../manager/service/sort/util/FieldInfoUtils.java | 16 +++++++++++++---
.../service/sort/util/FilterFunctionUtils.java | 3 +++
.../service/sort/util/NodeRelationShipUtils.java | 8 ++++----
4 files changed, 31 insertions(+), 16 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 17cd5aecb..ab5d1a0e3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -79,14 +79,14 @@ public class ExtractNodeUtils {
* @return
*/
public static MySqlExtractNode createExtractNode(BinlogSourceResponse
binlogSourceResponse) {
- String id = binlogSourceResponse.getSourceName();
- String name = binlogSourceResponse.getSourceName();
- String database = binlogSourceResponse.getDatabaseWhiteList();
- String primaryKey = binlogSourceResponse.getPrimaryKey();
- String hostName = binlogSourceResponse.getHostname();
- String userName = binlogSourceResponse.getUser();
- String password = binlogSourceResponse.getPassword();
- Integer port = binlogSourceResponse.getPort();
+ final String id = binlogSourceResponse.getSourceName();
+ final String name = binlogSourceResponse.getSourceName();
+ final String database = binlogSourceResponse.getDatabaseWhiteList();
+ final String primaryKey = binlogSourceResponse.getPrimaryKey();
+ final String hostName = binlogSourceResponse.getHostname();
+ final String userName = binlogSourceResponse.getUser();
+ final String password = binlogSourceResponse.getPassword();
+ final Integer port = binlogSourceResponse.getPort();
Integer serverId = null;
if (binlogSourceResponse.getServerId() != null &&
binlogSourceResponse.getServerId() > 0) {
serverId = binlogSourceResponse.getServerId();
@@ -99,13 +99,15 @@ public class ExtractNodeUtils {
.collect(Collectors.toList());
String serverTimeZone = binlogSourceResponse.getServerTimezone();
boolean incrementalSnapshotEnabled = true;
+
+ // TODO Needs to be configurable for those parameters
Map<String, String> properties = Maps.newHashMap();
if (binlogSourceResponse.isAllMigration()) {
// Unique properties when migrate all tables in database
incrementalSnapshotEnabled = false;
- properties.put("append-mode", "true");
properties.put("migrate-all", "true");
}
+ properties.put("append-mode", "true");
return new MySqlExtractNode(id,
name,
fieldInfos,
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index e6939d9e0..6538dd5c6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -134,14 +134,24 @@ public class FieldInfoUtils {
BuiltInField builtInField = BUILT_IN_FIELD_MAP.get(fieldName);
FormatInfo formatInfo = convertFieldFormat(fieldType.toLowerCase(),
format);
if (isBuiltin && builtInField != null) {
- fieldInfo = new BuiltInFieldInfo(fieldName, formatInfo,
builtInField);
+ return new BuiltInFieldInfo(fieldName, formatInfo, builtInField);
} else {
if (isBuiltin) {
+ // Check if fieldName contains buildInFieldName, such as
left_database
+ // TODO The buildin field needs to be selectable and cannot be
filled in by the user
+ for (String buildInFieldName : BUILT_IN_FIELD_MAP.keySet()) {
+ if (fieldName.contains(buildInFieldName)) {
+ builtInField =
BUILT_IN_FIELD_MAP.get(buildInFieldName);
+ break;
+ }
+ }
+ if (builtInField != null) {
+ return new BuiltInFieldInfo(fieldName, formatInfo,
builtInField);
+ }
log.warn("Unsupported metadata fieldName={} as the
builtInField is null", fieldName);
}
- fieldInfo = new FieldInfo(fieldName, formatInfo);
+ return new FieldInfo(fieldName, formatInfo);
}
- return fieldInfo;
}
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
index bb24eb87d..16f63bd69 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
@@ -122,6 +122,9 @@ public class FilterFunctionUtils {
}
private static FunctionParam parseTargetValue(TargetValue value, String
transformName) {
+ if (value == null) {
+ return new ConstantParam("");
+ }
boolean isConstant = value.isConstant();
if (isConstant) {
String constant = value.getTargetConstant();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
index 177850240..c62c78bea 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/NodeRelationShipUtils.java
@@ -133,7 +133,7 @@ public class NodeRelationShipUtils {
operator = EmptyOperator.getInstance();
}
filterFunctions.add(
- createFilterFunction(leftField, rightField, leftNode,
rightNode, operator));
+ createFilterFunction(leftField, rightField, operator));
}
Map<String, List<FilterFunction>> joinConditions = Maps.newHashMap();
joinConditions.put(rightNode, filterFunctions);
@@ -153,10 +153,10 @@ public class NodeRelationShipUtils {
}
private static SingleValueFilterFunction createFilterFunction(StreamField
leftField, StreamField rightField,
- String leftNode, String rightNode, LogicOperator operator) {
- FieldInfo sourceField = new FieldInfo(leftField.getFieldName(),
leftNode,
+ LogicOperator operator) {
+ FieldInfo sourceField = new FieldInfo(leftField.getOriginFieldName(),
leftField.getOriginNodeName(),
FieldInfoUtils.convertFieldFormat(leftField.getFieldType().name(),
leftField.getFieldFormat()));
- FieldInfo targetField = new FieldInfo(rightField.getFieldName(),
rightNode,
+ FieldInfo targetField = new FieldInfo(rightField.getOriginFieldName(),
rightField.getOriginNodeName(),
FieldInfoUtils.convertFieldFormat(rightField.getFieldType().name(),
rightField.getFieldFormat()));
return new SingleValueFilterFunction(operator, sourceField,
EqualOperator.getInstance(), targetField);
}