This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 32b90a05142bc5d9d92d6b3046ad71af32e5dcb5 Author: DongLiang-0 <[email protected]> AuthorDate: Mon Oct 9 09:56:26 2023 +0800 [fix](load)fix use regex split partition may cause backtracking (#24903) --- .../java/org/apache/doris/qe/MultiLoadMgr.java | 41 ++++++++++++---------- .../java/org/apache/doris/task/StreamLoadTask.java | 10 +++--- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 27f947aaf5d..2d1f512e29e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger; import org.awaitility.Awaitility; import java.io.StringReader; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -61,6 +62,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; // Class used to record state of multi-load operation public class MultiLoadMgr { @@ -104,11 +106,11 @@ public class MultiLoadMgr { // Add one load job private void load(String fullDbName, String label, - String subLabel, String table, - List<Pair<String, Long>> files, - TNetworkAddress fileAddr, - Map<String, String> properties, - long timestamp) throws DdlException { + String subLabel, String table, + List<Pair<String, Long>> files, + TNetworkAddress fileAddr, + Map<String, String> properties, + long timestamp) throws DdlException { LabelName multiLabel = new LabelName(fullDbName, label); lock.writeLock().lock(); try { @@ -250,9 +252,9 @@ public class MultiLoadMgr { } public void addFile(String subLabel, String table, List<Pair<String, Long>> files, - TNetworkAddress fileAddr, - Map<String, String> properties, - long timestamp) throws DdlException { + TNetworkAddress fileAddr, + Map<String, String> properties, + long timestamp) throws DdlException { if (isSubLabelUsed(subLabel, timestamp)) { // sub label is used and this is a retry request. @@ -334,7 +336,6 @@ public class MultiLoadMgr { return backendId; } - public LoadStmt toLoadStmt() throws DdlException { LabelName commitLabel = multiLabel; @@ -380,8 +381,8 @@ public class MultiLoadMgr { private Set<Long> timestamps = Sets.newHashSet(); public TableLoadDesc(String tbl, String label, List<Pair<String, Long>> files, - TNetworkAddress address, Map<String, String> properties, - long timestamp) { + TNetworkAddress address, Map<String, String> properties, + long timestamp) { this.tbl = tbl; this.filesByLabel = Maps.newLinkedHashMap(); @@ -415,7 +416,6 @@ public class MultiLoadMgr { timestamps.add(timestamp); } - public Long getBackendId() { return backendId; } @@ -463,13 +463,16 @@ public class MultiLoadMgr { } } if (properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS) != null) { - String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS) - .trim().split("\\s*,\\s*"); - partitionNames = new PartitionNames(false, Lists.newArrayList(partNames)); + String[] splitPartNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS).trim().split(","); + List<String> partNames = Arrays.stream(splitPartNames).map(String::trim) + .collect(Collectors.toList()); + partitionNames = new PartitionNames(false, partNames); } else if (properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS) != null) { - String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS) - .trim().split("\\s*,\\s*"); - partitionNames = new PartitionNames(true, Lists.newArrayList(partNames)); + String[] splitTempPartNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS).trim() + .split(","); + List<String> tempPartNames = Arrays.stream(splitTempPartNames).map(String::trim) + .collect(Collectors.toList()); + partitionNames = new PartitionNames(true, tempPartNames); } if (properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE) != null) { mergeType = LoadTask.MergeType.valueOf(properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE)); @@ -486,7 +489,7 @@ public class MultiLoadMgr { jsonPaths = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, ""); jsonRoot = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, ""); fuzzyParse = Boolean.valueOf( - properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false")); + properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false")); } } DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator, diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 222ca2dd21b..5f1f9e82eba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -37,13 +37,13 @@ import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.StringReader; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; public class StreamLoadTask implements LoadTaskInfo { @@ -267,7 +267,6 @@ public class StreamLoadTask implements LoadTaskInfo { return !Strings.isNullOrEmpty(sequenceCol); } - @Override public String getSequenceCol() { return sequenceCol; @@ -347,11 +346,12 @@ public class StreamLoadTask implements LoadTaskInfo { headerType = request.getHeaderType(); } if (request.isSetPartitions()) { - String[] partNames = request.getPartitions().trim().split("\\s*,\\s*"); + String[] splitPartNames = request.getPartitions().trim().split(","); + List<String> partNames = Arrays.stream(splitPartNames).map(String::trim).collect(Collectors.toList()); if (request.isSetIsTempPartition()) { - partitions = new PartitionNames(request.isIsTempPartition(), Lists.newArrayList(partNames)); + partitions = new PartitionNames(request.isIsTempPartition(), partNames); } else { - partitions = new PartitionNames(false, Lists.newArrayList(partNames)); + partitions = new PartitionNames(false, partNames); } } switch (request.getFileType()) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
