This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 74d87c9d4eb branch-2.1: [performance](load) fix broker load scan 
ranges for unsplittable files #43161 (#43976)
74d87c9d4eb is described below

commit 74d87c9d4eb37853a7d9a7171a6e5444568b4ca8
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 15 00:57:38 2024 +0800

    branch-2.1: [performance](load) fix broker load scan ranges for 
unsplittable files #43161 (#43976)
    
    Cherry-picked from #43161
    
    Co-authored-by: Kaijie Chen <[email protected]>
---
 .../org/apache/doris/datasource/FileGroupInfo.java | 98 ++++++++++++++++++----
 .../apache/doris/datasource/FileGroupIntoTest.java | 66 +++++++++++++++
 2 files changed, 147 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
index 4cea2e7e883..b84a546e4f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.Util;
@@ -52,7 +53,11 @@ import org.apache.logging.log4j.Logger;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * FileTable encapsulates a set of files to be scanned into a Table like 
structure,
@@ -83,6 +88,7 @@ public class FileGroupInfo {
     private boolean strictMode;
     private int loadParallelism;
     // set by getFileStatusAndCalcInstance
+    private int numInstances = 1;
     private long bytesPerInstance = 0;
     // used for stream load, FILE_LOCAL or FILE_STREAM
     private TFileType fileType;
@@ -172,7 +178,6 @@ public class FileGroupInfo {
             throw new UserException("No source file in this table(" + 
targetTable.getName() + ").");
         }
 
-        int numInstances = 1;
         if (jobType == JobType.BULK_LOAD) {
             long totalBytes = 0;
             for (TBrokerFileStatus fileStatus : fileStatuses) {
@@ -191,6 +196,7 @@ public class FileGroupInfo {
             }
         } else {
             // stream load, not need to split
+            numInstances = 1;
             bytesPerInstance = Long.MAX_VALUE;
         }
         LOG.info("number instance of file scan node is: {}, bytes per 
instance: {}", numInstances, bytesPerInstance);
@@ -199,6 +205,75 @@ public class FileGroupInfo {
     public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext 
context,
                                          FederationBackendPolicy backendPolicy,
                                          List<TScanRangeLocations> 
scanRangeLocations) throws UserException {
+        // Currently, we do not support mixed file types (or compress types).
+        // If any of the file is unsplittable, all files will be treated as 
unsplittable.
+        boolean isSplittable = true;
+        for (TBrokerFileStatus fileStatus : fileStatuses) {
+            TFileFormatType formatType = 
formatType(context.fileGroup.getFileFormat(), fileStatus.path);
+            TFileCompressType compressType =
+                    
Util.getOrInferCompressType(context.fileGroup.getCompressType(), 
fileStatus.path);
+            // Now only support split plain text
+            if (compressType == TFileCompressType.PLAIN
+                    && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && 
fileStatus.isSplitable)
+                    || formatType == TFileFormatType.FORMAT_JSON)) {
+                // is splittable
+            } else {
+                isSplittable = false;
+                break;
+            }
+        }
+
+        if (isSplittable) {
+            createScanRangeLocationsSplittable(context, backendPolicy, 
scanRangeLocations);
+        } else {
+            createScanRangeLocationsUnsplittable(context, backendPolicy, 
scanRangeLocations);
+        }
+    }
+
+    public void 
createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateContext 
context,
+                                                      FederationBackendPolicy 
backendPolicy,
+                                                      
List<TScanRangeLocations> scanRangeLocations)
+                                                      throws UserException {
+        List<Long> fileSizes = fileStatuses.stream().map(x -> 
x.size).collect(Collectors.toList());
+        List<List<Integer>> groups = assignFilesToInstances(fileSizes, 
numInstances);
+        for (List<Integer> group : groups) {
+            TScanRangeLocations locations = newLocations(context.params, 
brokerDesc, backendPolicy);
+            for (int i : group) {
+                TBrokerFileStatus fileStatus = fileStatuses.get(i);
+                TFileFormatType formatType = 
formatType(context.fileGroup.getFileFormat(), fileStatus.path);
+                context.params.setFormatType(formatType);
+                TFileCompressType compressType =
+                        
Util.getOrInferCompressType(context.fileGroup.getCompressType(), 
fileStatus.path);
+                context.params.setCompressType(compressType);
+                List<String> columnsFromPath = 
BrokerUtil.parseColumnsFromPath(fileStatus.path,
+                        context.fileGroup.getColumnNamesFromPath());
+                TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, 
fileStatus.size, columnsFromPath);
+                
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+            }
+            scanRangeLocations.add(locations);
+        }
+    }
+
+    public static List<List<Integer>> assignFilesToInstances(List<Long> 
fileSizes, int instances) {
+        int n = Math.min(fileSizes.size(), instances);
+        PriorityQueue<Pair<Long, List<Integer>>> pq = new PriorityQueue<>(n, 
Comparator.comparingLong(Pair::key));
+        for (int i = 0; i < n; i++) {
+            pq.add(Pair.of(0L, new ArrayList<>()));
+        }
+        List<Integer> index = IntStream.range(0, 
fileSizes.size()).boxed().collect(Collectors.toList());
+        index.sort((i, j) -> Long.compare(fileSizes.get(j), fileSizes.get(i)));
+        for (int i : index) {
+            Pair<Long, List<Integer>> p = pq.poll();
+            p.value().add(i);
+            pq.add(Pair.of(p.key() + fileSizes.get(i), p.value()));
+        }
+        return pq.stream().map(Pair::value).collect(Collectors.toList());
+    }
+
+    public void 
createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateContext context,
+                                                   FederationBackendPolicy 
backendPolicy,
+                                                   List<TScanRangeLocations> 
scanRangeLocations) throws UserException {
+
         TScanRangeLocations curLocations = newLocations(context.params, 
brokerDesc, backendPolicy);
         long curInstanceBytes = 0;
         long curFileOffset = 0;
@@ -217,27 +292,16 @@ public class FileGroupInfo {
             // Assign scan range locations only for broker load.
             // stream load has only one file, and no need to set multi scan 
ranges.
             if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) 
{
-                // Now only support split plain text
-                if (compressType == TFileCompressType.PLAIN
-                        && ((formatType == TFileFormatType.FORMAT_CSV_PLAIN && 
fileStatus.isSplitable)
-                        || formatType == TFileFormatType.FORMAT_JSON)) {
-                    long rangeBytes = bytesPerInstance - curInstanceBytes;
-                    TFileRangeDesc rangeDesc = 
createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
-                            columnsFromPath);
-                    
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-                    curFileOffset += rangeBytes;
-                } else {
-                    TFileRangeDesc rangeDesc = createFileRangeDesc(0, 
fileStatus, leftBytes,
-                            columnsFromPath);
-                    
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-                    i++;
-                }
+                long rangeBytes = bytesPerInstance - curInstanceBytes;
+                TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, 
fileStatus, rangeBytes,
+                        columnsFromPath);
+                
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+                curFileOffset += rangeBytes;
 
                 // New one scan
                 scanRangeLocations.add(curLocations);
                 curLocations = newLocations(context.params, brokerDesc, 
backendPolicy);
                 curInstanceBytes = 0;
-
             } else {
                 TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, 
fileStatus, leftBytes, columnsFromPath);
                 
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java
new file mode 100644
index 00000000000..b4470075717
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileGroupIntoTest.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+
+public class FileGroupIntoTest {
+
+    private static Stream<Arguments> provideParameters() {
+        return Stream.of(
+            // 6, 5, 4+1, 3+2, max=6
+            Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 4, 6),
+
+            // 6+1, 5+2, 4+3, max=7
+            Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 3, 7),
+
+            // 6+3+1, 5+4+2, max=11
+            Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 2, 11),
+
+            // 1 group, sum = 21
+            Arguments.of(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L), 1, 21),
+
+            // current algorithm is not perfect,
+            // perfect partition: 5+4, 3+3+3, max=9
+            // current partition: 5+3, 4+3+3, max=10
+            Arguments.of(Arrays.asList(3L, 3L, 3L, 4L, 5L), 2, 10),
+
+            // current algorithm is not perfect,
+            // perfect partition: 3+3+6, 4+4+4, max=12
+            // current partition: 6+4+3, 4+4+3, max=13
+            Arguments.of(Arrays.asList(3L, 3L, 4L, 4L, 4L, 6L), 2, 13)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("provideParameters")
+    public void testAssignFilesToInstances(List<Long> fileSizes, int 
numInstances, long expected) {
+        List<List<Integer>> groups = 
FileGroupInfo.assignFilesToInstances(fileSizes, numInstances);
+        long max = groups.stream().map(group -> 
group.stream().mapToLong(fileSizes::get).sum())
+                    .max(Long::compare).orElse(0L);
+        Assertions.assertEquals(expected, max);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to