This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 34cbdfab0db0894a0b53ebed0183ef4f3c526bfc Author: Xin Gong <[email protected]> AuthorDate: Thu Aug 25 15:06:46 2022 +0800 [INLONG-5680][Manager][Sort] Fix field relation object generate error (#5693) --- .../manager/pojo/sort/util/FieldRelationUtils.java | 24 ++++++++++------------ .../manager/pojo/sort/util/LoadNodeUtils.java | 11 +++++----- .../inlong/sort/parser/impl/FlinkSqlParser.java | 8 +++----- 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java index 870ad42ee..4f8df5f14 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/FieldRelationUtils.java @@ -20,9 +20,9 @@ package org.apache.inlong.manager.pojo.sort.util; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.util.Objects; import org.apache.inlong.manager.common.enums.FieldType; import org.apache.inlong.manager.common.enums.TransformType; +import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.transform.TransformDefinition; import org.apache.inlong.manager.pojo.transform.TransformResponse; @@ -33,7 +33,6 @@ import org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinitio import org.apache.inlong.manager.pojo.transform.replacer.StringReplacerDefinition.ReplaceRule; import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition; import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition.SplitRule; -import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.sort.formats.common.FormatInfo; import org.apache.inlong.sort.formats.common.StringTypeInfo; import org.apache.inlong.sort.protocol.FieldInfo; @@ -50,6 +49,7 @@ import org.apache.inlong.sort.protocol.transformation.function.SplitIndexFunctio import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -84,9 +84,9 @@ public class FieldRelationUtils { preNodes, constantFieldMap); case DE_DUPLICATION: case FILTER: - return createFieldRelations(fieldList, transformName, constantFieldMap); + return createFieldRelations(fieldList, constantFieldMap); case JOINER: - return createJoinerFieldRelations(fieldList, transformName, constantFieldMap); + return createJoinerFieldRelations(fieldList, constantFieldMap); default: throw new UnsupportedOperationException( String.format("Unsupported transformType=%s", transformType)); @@ -96,7 +96,7 @@ public class FieldRelationUtils { /** * Create relation of fields. */ - private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, String transformName, + private static List<FieldRelation> createFieldRelations(List<StreamField> fieldList, Map<String, StreamField> constantFieldMap) { return fieldList.stream() .map(FieldInfoUtils::parseStreamField) @@ -115,8 +115,7 @@ public class FieldRelationUtils { inputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getNodeId(), fieldInfo.getFormatInfo()); } - FieldInfo outputField = new FieldInfo(fieldInfo.getName(), transformName, - fieldInfo.getFormatInfo()); + FieldInfo outputField = new FieldInfo(fieldInfo.getName(), fieldInfo.getFormatInfo()); return new FieldRelation(inputField, outputField); }).collect(Collectors.toList()); } @@ -124,7 +123,7 @@ public class FieldRelationUtils { /** * Create relation of fields in join function. */ - private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, String transformName, + private static List<FieldRelation> createJoinerFieldRelations(List<StreamField> fieldList, Map<String, StreamField> constantFieldMap) { return fieldList.stream() .map(streamField -> { @@ -144,8 +143,7 @@ public class FieldRelationUtils { inputField = new FieldInfo(streamField.getOriginFieldName(), streamField.getOriginNodeName(), formatInfo); } - FieldInfo outputField = new FieldInfo(streamField.getFieldName(), - transformName, formatInfo); + FieldInfo outputField = new FieldInfo(streamField.getFieldName(), formatInfo); return new FieldRelation(inputField, outputField); }).collect(Collectors.toList()); } @@ -170,7 +168,7 @@ public class FieldRelationUtils { List<StreamField> filteredFieldList = fieldList.stream() .filter(streamFieldInfo -> !splitFields.contains(streamFieldInfo.getFieldName())) .collect(Collectors.toList()); - fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap)); + fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap)); return fieldRelations; } @@ -190,7 +188,7 @@ public class FieldRelationUtils { List<StreamField> filteredFieldList = fieldList.stream() .filter(streamFieldInfo -> !replaceFields.contains(streamFieldInfo.getFieldName())) .collect(Collectors.toList()); - fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap)); + fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap)); return fieldRelations; } @@ -209,7 +207,7 @@ public class FieldRelationUtils { List<StreamField> filteredFieldList = fieldList.stream() .filter(streamFieldInfo -> !encryptFields.contains(streamFieldInfo.getFieldName())) .collect(Collectors.toList()); - fieldRelations.addAll(createFieldRelations(filteredFieldList, transformName, constantFieldMap)); + fieldRelations.addAll(createFieldRelations(filteredFieldList, constantFieldMap)); return fieldRelations; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java index 6e96d7bfd..bd1b83bfb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java @@ -21,8 +21,8 @@ import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.enums.DataTypeEnum; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.FieldType; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.pojo.sink.SinkField; @@ -71,12 +71,12 @@ import org.apache.inlong.sort.protocol.transformation.ConstantParam; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.FunctionParam; import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.function.CustomFunction; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.inlong.sort.protocol.transformation.function.CustomFunction; /** * Util for load node info. @@ -102,8 +102,7 @@ public class LoadNodeUtils { List<FieldInfo> fieldInfos = streamSink.getSinkFieldList().stream() .map(field -> FieldInfoUtils.parseSinkFieldInfo(field, streamSink.getSinkName())) .collect(Collectors.toList()); - List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), - streamSink.getSinkName(), constantFieldMap); + List<FieldRelation> fieldRelations = parseSinkFields(streamSink.getSinkFieldList(), constantFieldMap); Map<String, String> properties = streamSink.getProperties().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); String sinkType = streamSink.getSinkType(); @@ -497,7 +496,7 @@ public class LoadNodeUtils { /** * Parse information field of data sink. */ - public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, String sinkName, + public static List<FieldRelation> parseSinkFields(List<SinkField> fieldList, Map<String, StreamField> constantFieldMap) { if (CollectionUtils.isEmpty(fieldList)) { return Lists.newArrayList(); @@ -505,7 +504,7 @@ public class LoadNodeUtils { return fieldList.stream() .filter(sinkField -> StringUtils.isNotEmpty(sinkField.getSourceFieldName())) .map(field -> { - FieldInfo outputField = new FieldInfo(field.getFieldName(), sinkName, + FieldInfo outputField = new FieldInfo(field.getFieldName(), FieldInfoUtils.convertFieldFormat(field.getFieldType(), field.getFieldFormat())); FunctionParam inputField; String fieldKey = String.format("%s-%s", field.getOriginNodeName(), field.getSourceFieldName()); diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java index 02722a9c5..3cc166601 100644 --- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java +++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java @@ -338,17 +338,15 @@ public class FlinkSqlParser implements Parser { // Generate mapping for output field to FieldRelation fieldRelations.forEach(s -> { // All field relations of input nodes will be the same if the node id of output field is blank. - // Currently, the node id in the output file is used to distinguish which field of the node in the upstream - // of the union the field comes from. A better way is through the upstream input field, + // Currently, the node id in the output field is used to distinguish which field of the node in the + // upstream of the union the field comes from. A better way is through the upstream input field, // but this abstraction does not yet have the ability to set node ids for all upstream input fields. // todo optimize the implementation of this block in the future String nodeId = s.getOutputField().getNodeId(); if (StringUtils.isBlank(nodeId)) { nodeId = unionRelation.getInputs().get(0); } - Map<String, FieldRelation> subRelationMap = fieldRelationMap - .computeIfAbsent(nodeId, k -> new HashMap<>()); - subRelationMap.put(s.getOutputField().getName(), s); + fieldRelationMap.computeIfAbsent(nodeId, k -> new HashMap<>()).put(s.getOutputField().getName(), s); }); StringBuilder sb = new StringBuilder(); sb.append(genUnionSingleSelectSql(unionRelation.getInputs().get(0),
