This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 74d87c9d4eb branch-2.1: [performance](load) fix broker load scan
ranges for unsplittable files #43161 (#43976)
74d87c9d4eb is described below
commit 74d87c9d4eb37853a7d9a7171a6e5444568b4ca8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 15 00:57:38 2024 +0800
branch-2.1: [performance](load) fix broker load scan ranges for
unsplittable files #43161 (#43976)
Cherry-picked from #43161
Co-authored-by: Kaijie Chen <[email protected]>
---
.../org/apache/doris/datasource/FileGroupInfo.java | 98 ++++++++++++++++++----
.../apache/doris/datasource/FileGroupIntoTest.java | 66 +++++++++++++++
2 files changed, 147 insertions(+), 17 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
index 4cea2e7e883..b84a546e4f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
@@ -52,7 +53,11 @@ import org.apache.logging.log4j.Logger;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* FileTable encapsulates a set of files to be scanned into a Table like
structure,
@@ -83,6 +88,7 @@ public class FileGroupInfo {
private boolean strictMode;
private int loadParallelism;
// set by getFileStatusAndCalcInstance
+ private int numInstances = 1;
private long bytesPerInstance = 0;
// used for stream load, FILE_LOCAL or FILE_STREAM
private TFileType fileType;
@@ -172,7 +178,6 @@ public class FileGroupInfo {
throw new UserException("No source file in this table(" +
targetTable.getName() + ").");
}
- int numInstances = 1;
if (jobType == JobType.BULK_LOAD) {
long totalBytes = 0;
for (TBrokerFileStatus fileStatus : fileStatuses) {
@@ -191,6 +196,7 @@ public class FileGroupInfo {
}
} else {
// stream load, not need to split
+ numInstances = 1;
bytesPerInstance = Long.MAX_VALUE;
}
LOG.info("number instance of file scan node is: {}, bytes per
instance: {}", numInstances, bytesPerInstance);
@@ -199,6 +205,75 @@ public class FileGroupInfo {
public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext
context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations>
scanRangeLocations) throws UserException {
+ // Currently, we do not support mixed file types (or compress types).
+ // If any of the file is unsplittable, all files will be treated as
unsplittable.
+ boolean isSplittable = true;
+ for (TBrokerFileStatus fileStatus : fileStatuses) {
+ TFileFormatType formatType =
formatType(context.fileGroup.getFileFormat(), fileStatus.path);
+ TFileCompressType compressType =
+
Util.getOrInferCompressType(context.fileGroup.getCompressType(),
fileStatus.path);
+ // Now only support split plain text
+ if (compressType == TFileCompressType.PLAIN
+ && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN &&
fileStatus.isSplitable)
+ || formatType == TFileFormatType.FORMAT_JSON)) {
+ // is splittable
+ } else {
+ isSplittable = false;
+ break;
+ }
+ }
+
+ if (isSplittable) {
+ createScanRangeLocationsSplittable(context, backendPolicy,
scanRangeLocations);
+ } else {
+ createScanRangeLocationsUnsplittable(context, backendPolicy,
scanRangeLocations);
+ }
+ }
+
+ public void
createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateContext
context,
+ FederationBackendPolicy
backendPolicy,
+
List<TScanRangeLocations> scanRangeLocations)
+ throws UserException {
+ List<Long> fileSizes = fileStatuses.stream().map(x ->
x.size).collect(Collectors.toList());
+ List<List<Integer>> groups = assignFilesToInstances(fileSizes,
numInstances);
+ for (List<Integer> group : groups) {
+ TScanRangeLocations locations = newLocations(context.params,
brokerDesc, backendPolicy);
+ for (int i : group) {
+ TBrokerFileStatus fileStatus = fileStatuses.get(i);
+ TFileFormatType formatType =
formatType(context.fileGroup.getFileFormat(), fileStatus.path);
+ context.params.setFormatType(formatType);
+ TFileCompressType compressType =
+
Util.getOrInferCompressType(context.fileGroup.getCompressType(),
fileStatus.path);
+ context.params.setCompressType(compressType);
+ List<String> columnsFromPath =
BrokerUtil.parseColumnsFromPath(fileStatus.path,
+ context.fileGroup.getColumnNamesFromPath());
+ TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus,
fileStatus.size, columnsFromPath);
+
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+ }
+ scanRangeLocations.add(locations);
+ }
+ }
+
+ public static List<List<Integer>> assignFilesToInstances(List<Long>
fileSizes, int instances) {
+ int n = Math.min(fileSizes.size(), instances);
+ PriorityQueue<Pair<Long, List<Integer>>> pq = new PriorityQueue<>(n,
Comparator.comparingLong(Pair::key));
+ for (int i = 0; i < n; i++) {
+ pq.add(Pair.of(0L, new ArrayList<>()));
+ }
+ List<Integer> index = IntStream.range(0,
fileSizes.size()).boxed().collect(Collectors.toList());
+ index.sort((i, j) -> Long.compare(fileSizes.get(j), fileSizes.get(i)));
+ for (int i : index) {
+ Pair<Long, List<Integer>> p = pq.poll();
+ p.value().add(i);
+ pq.add(Pair.of(p.key() + fileSizes.get(i), p.value()));
+ }
+ return pq.stream().map(Pair::value).collect(Collectors.toList());
+ }
+
+ public void
createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context,
+ FederationBackendPolicy
backendPolicy,
+ List<TScanRangeLocations>
scanRangeLocations) throws UserException {
+
TScanRangeLocations curLocations = newLocations(context.params,
brokerDesc, backendPolicy);
long curInstanceBytes = 0;
long curFileOffset = 0;
@@ -217,27 +292,16 @@ public class FileGroupInfo {
// Assign scan range locations only for broker load.
// stream load has only one file, and no need to set multi scan
ranges.
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD)
{
- // Now only support split plain text
- if (compressType == TFileCompressType.PLAIN
- && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN &&
fileStatus.isSplitable)
- || formatType == TFileFormatType.FORMAT_JSON)) {
- long rangeBytes = bytesPerInstance - curInstanceBytes;
- TFileRangeDesc rangeDesc =
createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
- columnsFromPath);
-
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
- curFileOffset += rangeBytes;
- } else {
- TFileRangeDesc rangeDesc = createFileRangeDesc(0,
fileStatus, leftBytes,
- columnsFromPath);
-
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
- i++;
- }
+ long rangeBytes = bytesPerInstance - curInstanceBytes;
+ TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset,
fileStatus, rangeBytes,
+ columnsFromPath);
+
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+ curFileOffset += rangeBytes;
// New one scan
scanRangeLocations.add(curLocations);
curLocations = newLocations(context.params, brokerDesc,
backendPolicy);
curInstanceBytes = 0;
-
} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset,
fileStatus, leftBytes, columnsFromPath);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java
new file mode 100644
index 00000000000..b4470075717
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+
+public class FileGroupIntoTest {
+
+ private static Stream<Arguments> provideParameters() {
+ return Stream.of(
+ // 6, 5, 4+1, 3+2, max=6
+ Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 4, 6),
+
+ // 6+1, 5+2, 4+3, max=7
+ Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 3, 7),
+
+ // 6+3+1, 5+4+2, max=11
+ Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 2, 11),
+
+ // 1 group, sum = 21
+ Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 1, 21),
+
+ // current algorithm is not perfect,
+ // perfect partition: 5+4, 3+3+3, max=9
+ // current partition: 5+3, 4+3+3, max=10
+ Arguments.of(Arrays.asList(3L, 3L, 3L, 4L, 5L), 2, 10),
+
+ // current algorithm is not perfect,
+ // perfect partition: 3+3+6, 4+4+4, max=12
+ // current partition: 6+4+3, 4+4+3, max=13
+ Arguments.of(Arrays.asList(3L, 3L, 4L, 4L, 4L, 6L), 2, 13)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideParameters")
+ public void testAssignFilesToInstances(List<Long> fileSizes, int
numInstances, long expected) {
+ List<List<Integer>> groups =
FileGroupInfo.assignFilesToInstances(fileSizes, numInstances);
+ long max = groups.stream().map(group ->
group.stream().mapToLong(fileSizes::get).sum())
+ .max(Long::compare).orElse(0L);
+ Assertions.assertEquals(expected, max);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]