This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c9b1819864e [opt](multi-catalog) Optimize file split size. (#58858)
c9b1819864e is described below
commit c9b1819864ee614a098250709b8c43adfb5af649
Author: Qi Chen <[email protected]>
AuthorDate: Mon Jan 5 21:32:08 2026 +0800
[opt](multi-catalog) Optimize file split size. (#58858)
### What problem does this PR solve?
### Release note
This PR introduces a **dynamic and progressive file split size
adjustment mechanism** to improve scan parallelism and resource
utilization for external table scans, while avoiding excessive small
splits or inefficiently large initial splits.
#### 1. Split Size Adjustment Strategy
##### 1.1 Non-Batch Split Mode
In non-batch split mode, a **two-phase split size selection strategy**
is applied based on the total size of all input files:
* The total size of all splits is calculated in advance.
* If the total size **exceeds `maxInitialSplitNum *
maxInitialSplitSize`**:
* `split_size = maxSplitSize` (default **64MB**)
* Otherwise:
* `split_size = maxInitialSplitSize` (default **32MB**)
This strategy reduces the number of splits for small datasets while
improving parallelism for large-scale scans.
---
##### 1.2 Batch Split Mode
In batch split mode, a **progressive split size adjustment strategy** is
introduced:
* As the total file size increases,
* When the number of files gradually **exceeds `maxInitialSplitNum`**,
* The `split_size` is **smoothly increased from `maxInitialSplitSize`
(32MB) toward `maxSplitSize` (64MB)**.
This approach avoids generating too many small splits at the early stage
while gradually increasing scan parallelism as the workload grows,
resulting in more stable scheduling and execution behavior.
---
##### 1.3 User-Specified Split Size (Backward Compatibility)
This PR **preserves the session variable `file_split_size`** for
user-defined split size configuration:
* If `file_split_size` is explicitly set by the user:
* The user-defined value takes precedence.
* The dynamic split size adjustment logic is bypassed.
* This ensures full backward compatibility with existing configurations
and tuning practices.
---
#### 2. Support Status by Data Source
| Data Source | Non-Batch Split Mode | Batch Split Mode | Notes |
| ----------- | -------------------- | ---------------- |
----------------------------------------------------- |
| Hive | ✅ Supported | ✅ Supported | Uses Doris internal HDFS
FileSplitter |
| Iceberg | ✅ Supported | ❌ Not supported | File splitting is currently
delegated to Iceberg APIs |
| Paimon | ✅ Supported | ❌ Not supported | Only non-batch split mode is
implemented |
---
#### 3. New Hive HDFS FileSplitter Logic
For Hive HDFS files, this PR introduces an enhanced file splitting
strategy:
1. **Splits never span multiple HDFS blocks**
* Prevents cross-block reads and avoids unnecessary IO overhead.
2. **Tail split optimization**
* If the remaining file size is smaller than `split_size * 2`,
* The remaining part is **evenly divided** into splits,
* Preventing the creation of very small tail splits and improving
overall scan efficiency.
---
#### Summary
* Introduces dynamic and progressive split size adjustment
* Supports both batch and non-batch split modes
* Preserves user-defined split size configuration for backward
compatibility
* Optimizes Hive HDFS file splitting to reduce small tail splits and
cross-block IO
---
.../doris/datasource/FederationBackendPolicy.java | 12 +-
.../apache/doris/datasource/FileQueryScanNode.java | 19 +-
.../org/apache/doris/datasource/FileScanNode.java | 10 +-
.../org/apache/doris/datasource/FileSplitter.java | 230 ++++++++++++++++++---
.../apache/doris/datasource/SplitGenerator.java | 2 +-
.../doris/datasource/hive/source/HiveScanNode.java | 78 +++++--
.../datasource/iceberg/source/IcebergScanNode.java | 80 ++++++-
.../datasource/paimon/source/PaimonScanNode.java | 47 ++++-
.../doris/datasource/tvf/source/TVFScanNode.java | 31 ++-
.../java/org/apache/doris/qe/SessionVariable.java | 60 ++++++
.../apache/doris/datasource/FileSplitterTest.java | 216 +++++++++++++++++++
.../paimon/source/PaimonScanNodeTest.java | 17 ++
.../doris/planner/FederationBackendPolicyTest.java | 4 +-
.../hive/test_hive_compress_type.groovy | 2 +-
14 files changed, 712 insertions(+), 96 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 813d1892642..a8927d86a94 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -63,12 +63,16 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
public class FederationBackendPolicy {
private static final Logger LOG =
LogManager.getLogger(FederationBackendPolicy.class);
+
+ private static final long FIXED_SHUFFLE_SEED = 123456789L;
+
protected final List<Backend> backends = Lists.newArrayList();
private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
@@ -220,6 +224,7 @@ public class FederationBackendPolicy {
public Multimap<Backend, Split> computeScanRangeAssignment(List<Split>
splits) throws UserException {
ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
+ Collections.shuffle(splits, new Random(FIXED_SHUFFLE_SEED));
List<Split> remainingSplits;
List<Backend> backends = new ArrayList<>();
@@ -228,8 +233,6 @@ public class FederationBackendPolicy {
}
ResettableRandomizedIterator<Backend> randomCandidates = new
ResettableRandomizedIterator<>(backends);
- boolean splitsToBeRedistributed = false;
-
// optimizedLocalScheduling enables prioritized assignment of splits
to local nodes when splits contain
// locality information
if (Config.split_assigner_optimized_local_scheduling) {
@@ -246,7 +249,6 @@ public class FederationBackendPolicy {
assignment.put(selectedBackend, split);
assignedWeightPerBackend.put(selectedBackend,
assignedWeightPerBackend.get(selectedBackend)
+ split.getSplitWeight().getRawValue());
- splitsToBeRedistributed = true;
continue;
}
}
@@ -276,7 +278,6 @@ public class FederationBackendPolicy {
case CONSISTENT_HASHING: {
candidateNodes = consistentHash.getNode(split,
Config.split_assigner_min_consistent_hash_candidate_num);
- splitsToBeRedistributed = true;
break;
}
default: {
@@ -302,7 +303,7 @@ public class FederationBackendPolicy {
assignedWeightPerBackend.get(selectedBackend) +
split.getSplitWeight().getRawValue());
}
- if (enableSplitsRedistribution && splitsToBeRedistributed) {
+ if (enableSplitsRedistribution) {
equateDistribution(assignment);
}
return assignment;
@@ -499,3 +500,4 @@ public class FederationBackendPolicy {
}
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 474e08922a7..2614006b6a1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -94,6 +94,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
protected TableScanParams scanParams;
+ protected FileSplitter fileSplitter;
+
/**
* External file scan node for Query hms table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
@@ -134,6 +136,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
initBackendPolicy();
initSchemaParams();
+ fileSplitter = new FileSplitter(sessionVariable.maxInitialSplitSize,
sessionVariable.maxSplitSize,
+ sessionVariable.maxInitialSplitNum);
}
// Init schema (Tuple/Slot) related params.
@@ -618,19 +622,4 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
return this.scanParams;
}
-
- /**
- * The real file split size is determined by:
- * 1. If user specify the split size in session variable
`file_split_size`, use user specified value.
- * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
- * @param blockSize, got from file system, eg, hdfs
- * @return the real file split size
- */
- protected long getRealFileSplitSize(long blockSize) {
- long realSplitSize = sessionVariable.getFileSplitSize();
- if (realSplitSize <= 0) {
- realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
- }
- return realSplitSize;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index c3e06999bba..a7aa0f607ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -62,9 +62,6 @@ import java.util.stream.Collectors;
* Base class for External File Scan, including external query and load.
*/
public abstract class FileScanNode extends ExternalScanNode {
-
- public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB
-
// For explain
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
@@ -115,12 +112,7 @@ public abstract class FileScanNode extends
ExternalScanNode {
}
output.append(prefix);
- boolean isBatch;
- try {
- isBatch = isBatchMode();
- } catch (UserException e) {
- throw new RuntimeException(e);
- }
+ boolean isBatch = isBatchMode();
if (isBatch) {
output.append("(approximate)");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
index 33b2d70bfb1..5fe84441971 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
@@ -22,13 +22,19 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileCompressType;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
public class FileSplitter {
private static final Logger LOG = LogManager.getLogger(FileSplitter.class);
@@ -40,18 +46,66 @@ public class FileSplitter {
return totalFileNum < parallelism * numBackends;
}
- public static List<Split> splitFile(
- LocationPath path,
- long fileSplitSize,
- BlockLocation[] blockLocations,
- long length,
- long modificationTime,
- boolean splittable,
- List<String> partitionValues,
- SplitCreator splitCreator)
- throws IOException {
+ private long maxInitialSplitSize;
+
+ private long maxSplitSize;
+
+ private int maxInitialSplitNum;
+ private final AtomicInteger remainingInitialSplitNum;
+
+ private long currentMaxSplitSize;
+
+ public long getMaxInitialSplitSize() {
+ return maxInitialSplitSize;
+ }
+
+ public void setMaxInitialSplitSize(long maxInitialSplitSize) {
+ this.maxInitialSplitSize = maxInitialSplitSize;
+ }
+
+ public long getMaxSplitSize() {
+ return maxSplitSize;
+ }
+
+ public void setMaxSplitSize(long maxSplitSize) {
+ this.maxSplitSize = maxSplitSize;
+ }
+
+ public int maxInitialSplitNum() {
+ return maxInitialSplitNum;
+ }
+
+ public void setMaxInitialSplits(int maxInitialSplitNum) {
+ this.maxInitialSplitNum = maxInitialSplitNum;
+ }
+
+ public long getRemainingInitialSplitNum() {
+ return remainingInitialSplitNum.get();
+ }
+
+ public FileSplitter(long maxInitialSplitSize, long maxSplitSize, int
maxInitialSplitNum) {
+ this.maxInitialSplitSize = maxInitialSplitSize;
+ this.maxSplitSize = maxSplitSize;
+ this.maxInitialSplitNum = maxInitialSplitNum;
+ currentMaxSplitSize = maxInitialSplitSize;
+ remainingInitialSplitNum = new AtomicInteger(maxInitialSplitNum);
+ }
+
+ public List<Split> splitFile(
+ LocationPath path,
+ long specifiedFileSplitSize,
+ BlockLocation[] blockLocations,
+ long length,
+ long modificationTime,
+ boolean splittable,
+ List<String> partitionValues,
+ SplitCreator splitCreator)
+ throws IOException {
+ // Pass splitCreator.create() to set target file split size to
calculate split weight.
+ long targetFileSplitSize = specifiedFileSplitSize > 0 ?
specifiedFileSplitSize : maxSplitSize;
if (blockLocations == null) {
- blockLocations = new BlockLocation[0];
+ blockLocations = new BlockLocation[1];
+ blockLocations[0] = new BlockLocation(null, null, 0L, length);
}
List<Split> result = Lists.newArrayList();
TFileCompressType compressType =
Util.inferFileCompressTypeByPath(path.getNormalizedLocation());
@@ -60,23 +114,83 @@ public class FileSplitter {
LOG.debug("Path {} is not splittable.", path);
}
String[] hosts = blockLocations.length == 0 ? null :
blockLocations[0].getHosts();
- result.add(splitCreator.create(path, 0, length, length,
fileSplitSize,
+ result.add(splitCreator.create(path, 0, length, length,
+ targetFileSplitSize,
modificationTime, hosts, partitionValues));
+ updateCurrentMaxSplitSize();
+ return result;
+ }
+
+ // if specified split size is not zero, split file by specified size
+ if (specifiedFileSplitSize > 0) {
+ long bytesRemaining;
+ for (bytesRemaining = length; (double) bytesRemaining / (double)
specifiedFileSplitSize > 1.1D;
+ bytesRemaining -= specifiedFileSplitSize) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
+ result.add(splitCreator.create(path, length - bytesRemaining,
specifiedFileSplitSize,
+ length, specifiedFileSplitSize, modificationTime,
hosts, partitionValues));
+ }
+ if (bytesRemaining != 0L) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
+ result.add(splitCreator.create(path, length - bytesRemaining,
bytesRemaining,
+ length, specifiedFileSplitSize, modificationTime,
hosts, partitionValues));
+ }
return result;
}
- long bytesRemaining;
- for (bytesRemaining = length; (double) bytesRemaining / (double)
fileSplitSize > 1.1D;
- bytesRemaining -= fileSplitSize) {
- int location = getBlockIndex(blockLocations, length -
bytesRemaining);
- String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
- result.add(splitCreator.create(path, length - bytesRemaining,
fileSplitSize,
- length, fileSplitSize, modificationTime, hosts,
partitionValues));
+
+ // split file by block
+ long start = 0;
+ ImmutableList.Builder<InternalBlock> blockBuilder =
ImmutableList.builder();
+ for (BlockLocation blockLocation : blockLocations) {
+ // clamp the block range
+ long blockStart = Math.max(start, blockLocation.getOffset());
+ long blockEnd = Math.min(start + length, blockLocation.getOffset()
+ blockLocation.getLength());
+ if (blockStart > blockEnd) {
+ // block is outside split range
+ continue;
+ }
+ if (blockStart == blockEnd && !(blockStart == start && blockEnd ==
start + length)) {
+ // skip zero-width block, except in the special circumstance:
+ // slice is empty, and the block covers the empty slice
interval.
+ continue;
+ }
+ blockBuilder.add(new InternalBlock(blockStart, blockEnd,
blockLocation.getHosts()));
+ }
+ List<InternalBlock> blocks = blockBuilder.build();
+ if (blocks.isEmpty()) {
+ result.add(splitCreator.create(path, 0, length, length,
+ targetFileSplitSize, modificationTime, null,
+ partitionValues));
+ updateCurrentMaxSplitSize();
+ return result;
}
- if (bytesRemaining != 0L) {
- int location = getBlockIndex(blockLocations, length -
bytesRemaining);
- String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
- result.add(splitCreator.create(path, length - bytesRemaining,
bytesRemaining,
- length, fileSplitSize, modificationTime, hosts,
partitionValues));
+
+ long splitStart = start;
+ int currentBlockIdx = 0;
+ while (splitStart < start + length) {
+ updateCurrentMaxSplitSize();
+ long splitBytes;
+ long remainingBlockBytes = blocks.get(currentBlockIdx).getEnd() -
splitStart;
+ if (remainingBlockBytes <= currentMaxSplitSize) {
+ splitBytes = remainingBlockBytes;
+ } else if (currentMaxSplitSize * 2 >= remainingBlockBytes) {
+ // Second to last split in this block, generate two evenly
sized splits
+ splitBytes = remainingBlockBytes / 2;
+ } else {
+ splitBytes = currentMaxSplitSize;
+ }
+ result.add(splitCreator.create(path, splitStart, splitBytes,
+ length, targetFileSplitSize, modificationTime,
blocks.get(currentBlockIdx).getHosts(),
+ partitionValues));
+ splitStart += splitBytes;
+ if (splitStart == blocks.get(currentBlockIdx).getEnd()) {
+ currentBlockIdx++;
+ if (currentBlockIdx != blocks.size()) {
+ Verify.verify(splitStart ==
blocks.get(currentBlockIdx).getStart());
+ }
+ }
}
if (LOG.isDebugEnabled()) {
@@ -85,7 +199,19 @@ public class FileSplitter {
return result;
}
- private static int getBlockIndex(BlockLocation[] blkLocations, long
offset) {
+ private void updateCurrentMaxSplitSize() {
+ currentMaxSplitSize = maxSplitSize;
+ int cur = remainingInitialSplitNum.get();
+ while (cur > 0) {
+ if (remainingInitialSplitNum.compareAndSet(cur, cur - 1)) {
+ currentMaxSplitSize = maxInitialSplitSize;
+ break;
+ }
+ cur = remainingInitialSplitNum.get();
+ }
+ }
+
+ private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
if (blkLocations == null || blkLocations.length == 0) {
return -1;
}
@@ -100,5 +226,59 @@ public class FileSplitter {
throw new IllegalArgumentException(String.format("Offset %d is outside
of file (0..%d)", offset, fileLength));
}
+ private static class InternalBlock {
+ private final long start;
+ private final long end;
+ private final String[] hosts;
+
+ public InternalBlock(long start, long end, String[] hosts) {
+ Preconditions.checkArgument(start <= end, "block end cannot be
before block start");
+ this.start = start;
+ this.end = end;
+ this.hosts = hosts;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public String[] getHosts() {
+ return hosts;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ InternalBlock that = (InternalBlock) o;
+ return start == that.start && end == that.end &&
Arrays.equals(hosts, that.hosts);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(start, end);
+ result = 31 * result + Arrays.hashCode(hosts);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("InternalBlock{");
+ sb.append("start=").append(start);
+ sb.append(", end=").append(end);
+ sb.append(", hosts=").append(Arrays.toString(hosts));
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index 0b8a1022d5a..391552a5106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -40,7 +40,7 @@ public interface SplitGenerator {
/**
* Whether the producer(e.g. ScanNode) support batch mode.
*/
- default boolean isBatchMode() throws UserException {
+ default boolean isBatchMode() {
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 744423f622c..5bcf2f5546a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -186,7 +186,7 @@ public class HiveScanNode extends FileQueryScanNode {
.getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
List<Split> allFiles = Lists.newArrayList();
- getFileSplitByPartitions(cache, prunedPartitions, allFiles,
bindBrokerName, numBackends);
+ getFileSplitByPartitions(cache, prunedPartitions, allFiles,
bindBrokerName, numBackends, false);
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
}
@@ -226,7 +226,8 @@ public class HiveScanNode extends FileQueryScanNode {
try {
List<Split> allFiles = Lists.newArrayList();
getFileSplitByPartitions(
- cache,
Collections.singletonList(partition), allFiles, bindBrokerName, numBackends);
+ cache,
Collections.singletonList(partition), allFiles, bindBrokerName,
+ numBackends, true);
if (allFiles.size() > numSplitsPerPartition.get())
{
numSplitsPerPartition.set(allFiles.size());
}
@@ -277,7 +278,8 @@ public class HiveScanNode extends FileQueryScanNode {
}
private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
- List<Split> allFiles, String bindBrokerName, int numBackends)
throws IOException, UserException {
+ List<Split> allFiles, String bindBrokerName, int numBackends,
+ boolean isBatchMode) throws IOException, UserException {
List<FileCacheValue> fileCaches;
if (hiveTransaction != null) {
try {
@@ -293,9 +295,11 @@ public class HiveScanNode extends FileQueryScanNode {
fileCaches = cache.getFilesByPartitions(partitions, withCache,
partitions.size() > 1,
directoryLister, hmsTable);
}
+
+ long targetFileSplitSize = determineTargetFileSplitSize(fileCaches,
isBatchMode);
if (tableSample != null) {
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses =
selectFiles(fileCaches);
- splitAllFiles(allFiles, hiveFileStatuses);
+ splitAllFiles(allFiles, hiveFileStatuses, targetFileSplitSize);
return;
}
@@ -319,27 +323,67 @@ public class HiveScanNode extends FileQueryScanNode {
int parallelNum = sessionVariable.getParallelExecInstanceNum();
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
}
+
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
- if (fileCacheValue.getFiles() != null) {
- boolean isSplittable = fileCacheValue.isSplittable();
- for (HiveMetaStoreCache.HiveFileStatus status :
fileCacheValue.getFiles()) {
- allFiles.addAll(FileSplitter.splitFile(status.getPath(),
- // set block size to Long.MAX_VALUE to avoid
splitting the file.
- getRealFileSplitSize(needSplit ?
status.getBlockSize() : Long.MAX_VALUE),
- status.getBlockLocations(), status.getLength(),
status.getModificationTime(),
- isSplittable, fileCacheValue.getPartitionValues(),
- new
HiveSplitCreator(fileCacheValue.getAcidInfo())));
+ if (fileCacheValue.getFiles() == null) {
+ continue;
+ }
+ boolean isSplittable = fileCacheValue.isSplittable();
+
+ for (HiveMetaStoreCache.HiveFileStatus status :
fileCacheValue.getFiles()) {
+ allFiles.addAll(fileSplitter.splitFile(
+ status.getPath(),
+ targetFileSplitSize,
+ status.getBlockLocations(),
+ status.getLength(),
+ status.getModificationTime(),
+ isSplittable && needSplit,
+ fileCacheValue.getPartitionValues(),
+ new HiveSplitCreator(fileCacheValue.getAcidInfo())));
+ }
+ }
+ }
+
+ private long determineTargetFileSplitSize(List<FileCacheValue> fileCaches,
+ boolean isBatchMode) {
+ if (sessionVariable.getFileSplitSize() > 0) {
+ return sessionVariable.getFileSplitSize();
+ }
+ /** Hive batch split mode will return 0. and <code>FileSplitter</code>
+ * will determine file split size.
+ */
+ if (isBatchMode) {
+ return 0;
+ }
+ long result = sessionVariable.getMaxInitialSplitSize();
+ long totalFileSize = 0;
+ for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
+ if (fileCacheValue.getFiles() == null) {
+ continue;
+ }
+ for (HiveMetaStoreCache.HiveFileStatus status :
fileCacheValue.getFiles()) {
+ totalFileSize += status.getLength();
+ if (totalFileSize >= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
+ result = sessionVariable.getMaxSplitSize();
+ break;
}
}
}
+ return result;
}
private void splitAllFiles(List<Split> allFiles,
- List<HiveMetaStoreCache.HiveFileStatus>
hiveFileStatuses) throws IOException {
+ List<HiveMetaStoreCache.HiveFileStatus>
hiveFileStatuses,
+ long realFileSplitSize) throws IOException {
for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
- allFiles.addAll(FileSplitter.splitFile(status.getPath(),
getRealFileSplitSize(status.getBlockSize()),
- status.getBlockLocations(), status.getLength(),
status.getModificationTime(),
- status.isSplittable(), status.getPartitionValues(),
+ allFiles.addAll(fileSplitter.splitFile(
+ status.getPath(),
+ realFileSplitSize,
+ status.getBlockLocations(),
+ status.getLength(),
+ status.getModificationTime(),
+ status.isSplittable(),
+ status.getPartitionValues(),
new HiveSplitCreator(status.getAcidInfo())));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f7850040fca..030e30a13ed 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -86,6 +86,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -120,7 +121,7 @@ public class IcebergScanNode extends FileQueryScanNode {
private boolean tableLevelPushDownCount = false;
private long countFromSnapshot;
private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
- private long targetSplitSize;
+ private long targetSplitSize = 0;
// This is used to avoid repeatedly calculating partition info map for the
same partition data.
private Map<PartitionData, Map<String, String>> partitionMapInfos;
private boolean isPartitionedTable;
@@ -147,6 +148,8 @@ public class IcebergScanNode extends FileQueryScanNode {
private String cachedNormalizedPathPrefix;
private String cachedFsIdentifier;
+ private Boolean isBatchMode = null;
+
// for test
@VisibleForTesting
public IcebergScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
@@ -187,7 +190,6 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
protected void doInitialize() throws UserException {
icebergTable = source.getIcebergTable();
- targetSplitSize = getRealFileSplitSize(0);
partitionMapInfos = new HashMap<>();
isPartitionedTable = icebergTable.spec().isPartitioned();
formatVersion = ((BaseTable)
icebergTable).operations().current().formatVersion();
@@ -392,19 +394,58 @@ public class IcebergScanNode extends FileQueryScanNode {
private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
- long targetSplitSize = getRealFileSplitSize(0);
- return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ return splitFiles(scan);
}
try {
return planFileScanTaskWithManifestCache(scan);
} catch (Exception e) {
manifestCacheFailures++;
LOG.warn("Plan with manifest cache failed, fallback to original
scan: " + e.getMessage(), e);
- long targetSplitSize = getRealFileSplitSize(0);
- return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ return splitFiles(scan);
}
}
+ private CloseableIterable<FileScanTask> splitFiles(TableScan scan) {
+ if (sessionVariable.getFileSplitSize() > 0) {
+ return TableScanUtil.splitFiles(scan.planFiles(),
+ sessionVariable.getFileSplitSize());
+ }
+ if (isBatchMode()) {
+ // Currently iceberg batch split mode will use max split size.
+ // TODO: dynamic split size in batch split mode need to customize
iceberg splitter.
+ return TableScanUtil.splitFiles(scan.planFiles(),
sessionVariable.getMaxSplitSize());
+ }
+
+ // Non Batch Mode
+ // Materialize planFiles() into a list to avoid iterating the
CloseableIterable twice.
+ // RISK: It will cost memory if the table is large.
+ List<FileScanTask> fileScanTaskList = new ArrayList<>();
+ try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles())
{
+ for (FileScanTask task : scanTasksIter) {
+ fileScanTaskList.add(task);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to materialize file scan
tasks", e);
+ }
+
+ targetSplitSize = determineTargetFileSplitSize(fileScanTaskList);
+ return
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList),
targetSplitSize);
+ }
+
+ private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) {
+ long result = sessionVariable.getMaxInitialSplitSize();
+ long accumulatedTotalFileSize = 0;
+ for (FileScanTask task : tasks) {
+ accumulatedTotalFileSize +=
ScanTaskUtil.contentSizeInBytes(task.file());
+ if (accumulatedTotalFileSize
+ >= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
+ result = sessionVariable.getMaxSplitSize();
+ break;
+ }
+ }
+ return result;
+ }
+
private CloseableIterable<FileScanTask>
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
// Get the snapshot from the scan; return empty if no snapshot exists
Snapshot snapshot = scan.snapshot();
@@ -519,7 +560,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
// Split tasks into smaller chunks based on target split size for
parallel processing
- long targetSplitSize = getRealFileSplitSize(0);
+ targetSplitSize = determineTargetFileSplitSize(tasks);
return
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
targetSplitSize);
}
@@ -677,21 +718,36 @@ public class IcebergScanNode extends FileQueryScanNode {
}
@Override
- public boolean isBatchMode() throws UserException {
+ public boolean isBatchMode() {
+ Boolean cached = isBatchMode;
+ if (cached != null) {
+ return cached;
+ }
TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT)) {
- countFromSnapshot = getCountFromSnapshot();
+ try {
+ countFromSnapshot = getCountFromSnapshot();
+ } catch (UserException e) {
+ throw new RuntimeException(e);
+ }
if (countFromSnapshot >= 0) {
tableLevelPushDownCount = true;
+ isBatchMode = false;
return false;
}
}
- if (createTableScan().snapshot() == null) {
- return false;
+ try {
+ if (createTableScan().snapshot() == null) {
+ isBatchMode = false;
+ return false;
+ }
+ } catch (UserException e) {
+ throw new RuntimeException(e);
}
if (!sessionVariable.getEnableExternalTableBatchMode()) {
+ isBatchMode = false;
return false;
}
@@ -707,10 +763,12 @@ public class IcebergScanNode extends FileQueryScanNode {
ManifestFile next = matchingManifest.next();
cnt += next.addedFilesCount() +
next.existingFilesCount();
if (cnt >= sessionVariable.getNumFilesInBatchMode()) {
+ isBatchMode = true;
return true;
}
}
}
+ isBatchMode = false;
return false;
});
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 92b0862d06e..4e28c924f33 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.ExternalUtil;
import org.apache.doris.datasource.FileQueryScanNode;
-import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.credentials.CredentialUtils;
import org.apache.doris.datasource.credentials.VendedCredentialsFactory;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
@@ -298,7 +297,8 @@ public class PaimonScanNode extends FileQueryScanNode {
// And for counting the number of selected partitions for this paimon
table.
Map<BinaryRow, Map<String, String>> partitionInfoMaps = new
HashMap<>();
// if applyCountPushdown is true, we can't split the DataSplit
- long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ?
Long.MAX_VALUE : 0);
+ boolean hasDeterminedTargetFileSplitSize = false;
+ long targetFileSplitSize = 0;
for (DataSplit dataSplit : dataSplits) {
SplitStat splitStat = new SplitStat();
splitStat.setRowCount(dataSplit.rowCount());
@@ -330,6 +330,10 @@ public class PaimonScanNode extends FileQueryScanNode {
if (ignoreSplitType ==
SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
continue;
}
+ if (!hasDeterminedTargetFileSplitSize) {
+ targetFileSplitSize =
determineTargetFileSplitSize(dataSplits, isBatchMode());
+ hasDeterminedTargetFileSplitSize = true;
+ }
splitStat.setType(SplitReadType.NATIVE);
splitStat.setRawFileConvertable(true);
List<RawFile> rawFiles = optRawFiles.get();
@@ -337,13 +341,13 @@ public class PaimonScanNode extends FileQueryScanNode {
RawFile file = rawFiles.get(i);
LocationPath locationPath = LocationPath.of(file.path(),
storagePropertiesMap);
try {
- List<Split> dorisSplits = FileSplitter.splitFile(
+ List<Split> dorisSplits = fileSplitter.splitFile(
locationPath,
- realFileSplitSize,
+ targetFileSplitSize,
null,
file.length(),
-1,
- true,
+ !applyCountPushdown,
null,
PaimonSplit.PaimonSplitCreator.DEFAULT);
for (Split dorisSplit : dorisSplits) {
@@ -388,12 +392,43 @@ public class PaimonScanNode extends FileQueryScanNode {
// We need to set the target size for all splits so that we can
calculate the
// proportion of each split later.
- splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));
+ splits.forEach(s ->
s.setTargetSplitSize(sessionVariable.getFileSplitSize() > 0
+ ? sessionVariable.getFileSplitSize() :
sessionVariable.getMaxSplitSize()));
this.selectedPartitionNum = partitionInfoMaps.size();
return splits;
}
+ private long determineTargetFileSplitSize(List<DataSplit> dataSplits,
+ boolean isBatchMode) {
+ if (sessionVariable.getFileSplitSize() > 0) {
+ return sessionVariable.getFileSplitSize();
+ }
+ /** Paimon batch split mode will return 0. and
<code>FileSplitter</code>
+ * will determine file split size.
+ */
+ if (isBatchMode) {
+ return 0;
+ }
+ long result = sessionVariable.getMaxInitialSplitSize();
+ long totalFileSize = 0;
+ for (DataSplit dataSplit : dataSplits) {
+ Optional<List<RawFile>> rawFiles = dataSplit.convertToRawFiles();
+ if (!supportNativeReader(rawFiles)) {
+ continue;
+ }
+ for (RawFile rawFile : rawFiles.get()) {
+ totalFileSize += rawFile.fileSize();
+ if (totalFileSize
+ >= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
+ result = sessionVariable.getMaxSplitSize();
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
@VisibleForTesting
public Map<String, String> getIncrReadParams() throws UserException {
Map<String, String> paimonScanParams = new HashMap<>();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index e7567559762..c3b0e3e8b6d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -146,12 +146,18 @@ public class TVFScanNode extends FileQueryScanNode {
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum,
numBackends, totalFileNum);
}
+ long targetFileSplitSize = determineTargetFileSplitSize(fileStatuses);
+
for (TBrokerFileStatus fileStatus : fileStatuses) {
try {
-
splits.addAll(FileSplitter.splitFile(LocationPath.of(fileStatus.getPath()),
- getRealFileSplitSize(needSplit ?
fileStatus.getBlockSize() : Long.MAX_VALUE),
- null, fileStatus.getSize(),
- fileStatus.getModificationTime(),
fileStatus.isSplitable, null,
+ splits.addAll(fileSplitter.splitFile(
+ LocationPath.of(fileStatus.getPath()),
+ targetFileSplitSize,
+ null,
+ fileStatus.getSize(),
+ fileStatus.getModificationTime(),
+ fileStatus.isSplitable && needSplit,
+ null,
FileSplitCreator.DEFAULT));
} catch (IOException e) {
LOG.warn("get file split failed for TVF: {}",
fileStatus.getPath(), e);
@@ -161,6 +167,23 @@ public class TVFScanNode extends FileQueryScanNode {
return splits;
}
+ private long determineTargetFileSplitSize(List<TBrokerFileStatus>
fileStatuses) {
+ if (sessionVariable.getFileSplitSize() > 0) {
+ return sessionVariable.getFileSplitSize();
+ }
+ long result = sessionVariable.getMaxInitialSplitSize();
+ long totalFileSize = 0;
+ for (TBrokerFileStatus fileStatus : fileStatuses) {
+ totalFileSize += fileStatus.getSize();
+ if (totalFileSize
+ >= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
+ result = sessionVariable.getMaxSplitSize();
+ break;
+ }
+ }
+ return result;
+ }
+
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (split instanceof FileSplit) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 11df0e53eb1..79768d17b4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -513,6 +513,12 @@ public class SessionVariable implements Serializable,
Writable {
// Split size for ExternalFileScanNode. Default value 0 means use the
block size of HDFS/S3.
public static final String FILE_SPLIT_SIZE = "file_split_size";
+ public static final String MAX_INITIAL_FILE_SPLIT_SIZE =
"max_initial_file_split_size";
+
+ public static final String MAX_FILE_SPLIT_SIZE = "max_file_split_size";
+
+ public static final String MAX_INITIAL_FILE_SPLIT_NUM =
"max_initial_file_split_num";
+
// Target file size in bytes for Iceberg write operations
public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES =
"iceberg_write_target_file_size_bytes";
@@ -2182,6 +2188,36 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
public long fileSplitSize = 0;
+ @VariableMgr.VarAttr(
+ name = MAX_INITIAL_FILE_SPLIT_SIZE,
+ description = {"对于每个 table scan,最大文件分片初始大小。"
+ + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了
MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。",
+ "For each table scan, The maximum initial file split size.
"
+ + "Initialize using MAX_INITIAL_FILE_SPLIT_SIZE,"
+ + " and once MAX_INITIAL_FILE_SPLIT_NUM is
exceeded, use MAX_FILE_SPLIT_SIZE instead."},
+ needForward = true)
+ public long maxInitialSplitSize = 32L * 1024L * 1024L;
+
+ @VariableMgr.VarAttr(
+ name = MAX_FILE_SPLIT_SIZE,
+ description = {"对于每个 table scan,最大文件分片大小。"
+ + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了
MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。",
+ "For each table scan, the maximum initial file split size.
"
+ + "Initialize using MAX_INITIAL_FILE_SPLIT_SIZE,"
+ + " and once MAX_INITIAL_FILE_SPLIT_NUM is
exceeded, use MAX_FILE_SPLIT_SIZE instead."},
+ needForward = true)
+ public long maxSplitSize = 64L * 1024L * 1024L;
+
+ @VariableMgr.VarAttr(
+ name = MAX_INITIAL_FILE_SPLIT_NUM,
+ description = {"对于每个 table scan,最大文件分片初始数目。"
+ + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了
MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。",
+ "For each table scan, the maximum initial file split
number. "
+ + "Initialize using MAX_INITIAL_FILE_SPLIT_SIZE,"
+ + " and once MAX_INITIAL_FILE_SPLIT_NUM is
exceeded, use MAX_FILE_SPLIT_SIZE instead."},
+ needForward = true)
+ public int maxInitialSplitNum = 200;
+
// Target file size for Iceberg write operations
// Default 0 means use config::iceberg_sink_max_file_size
@VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES,
needForward = true)
@@ -4236,6 +4272,30 @@ public class SessionVariable implements Serializable,
Writable {
this.fileSplitSize = fileSplitSize;
}
+ public long getMaxInitialSplitSize() {
+ return maxInitialSplitSize;
+ }
+
+ public void setMaxInitialSplitSize(long maxInitialSplitSize) {
+ this.maxInitialSplitSize = maxInitialSplitSize;
+ }
+
+ public long getMaxSplitSize() {
+ return maxSplitSize;
+ }
+
+ public void setMaxSplitSize(long maxSplitSize) {
+ this.maxSplitSize = maxSplitSize;
+ }
+
+ public int getMaxInitialSplitNum() {
+ return maxInitialSplitNum;
+ }
+
+ public void setMaxInitialSplitNum(int maxInitialSplitNum) {
+ this.maxInitialSplitNum = maxInitialSplitNum;
+ }
+
public long getIcebergWriteTargetFileSizeBytes() {
return icebergWriteTargetFileSizeBytes;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java
new file mode 100644
index 00000000000..a455923da4d
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.spi.Split;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FileSplitterTest {
+
+ private static final long MB = 1024L * 1024L;
+
+ private static final int DEFAULT_INITIAL_SPLITS = 200;
+
+ @Test
+ public void testNonSplittableCompressedFileProducesSingleSplit() throws
Exception {
+ LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz");
+ BlockLocation[] locations = new BlockLocation[]{new
BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)};
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB,
DEFAULT_INITIAL_SPLITS);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ 10 * MB,
+ 0L,
+ true,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+ Assert.assertEquals(1, splits.size());
+ Split s = splits.get(0);
+ Assert.assertEquals(10 * MB, ((org.apache.doris.datasource.FileSplit)
s).getLength());
+ // host should be preserved
+ Assert.assertArrayEquals(new String[]{"h1"},
((org.apache.doris.datasource.FileSplit) s).getHosts());
+ Assert.assertEquals(DEFAULT_INITIAL_SPLITS - 1,
fileSplitter.getRemainingInitialSplitNum());
+ }
+
+ @Test
+ public void testEmptyBlockLocationsProducesSingleSplitAndNullHosts()
throws Exception {
+ LocationPath loc = LocationPath.of("hdfs://example.com/path/file");
+ BlockLocation[] locations = new BlockLocation[0];
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB,
DEFAULT_INITIAL_SPLITS);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ 5 * MB,
+ 0L,
+ true,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+ Assert.assertEquals(1, splits.size());
+ org.apache.doris.datasource.FileSplit s =
(org.apache.doris.datasource.FileSplit) splits.get(0);
+ Assert.assertEquals(5 * MB, s.getLength());
+ // hosts should be empty array when passing null
+ Assert.assertNotNull(s.getHosts());
+ Assert.assertEquals(0, s.getHosts().length);
+ }
+
+ @Test
+ public void
testSplittableSingleBigBlockProducesExpectedSplitsWithInitialSmallChunks()
throws Exception {
+ LocationPath loc = LocationPath.of("hdfs://example.com/path/bigfile");
+ long length = 200 * MB;
+ BlockLocation[] locations = new BlockLocation[]{new
BlockLocation(null, new String[]{"h1"}, 0L, length)};
+ // set maxInitialSplits to 2 to force the first two splits to be small.
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ length,
+ 0L,
+ true,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+
+ // expect splits sizes: 32MB, 32MB, 64MB, 36MB, 36MB -> sum is 200MB
+ long[] expected = new long[]{32 * MB, 32 * MB, 64 * MB, 36 * MB, 36 *
MB};
+ Assert.assertEquals(expected.length, splits.size());
+ long sum = 0L;
+ for (int i = 0; i < expected.length; i++) {
+ org.apache.doris.datasource.FileSplit s =
(org.apache.doris.datasource.FileSplit) splits.get(i);
+ Assert.assertEquals(expected[i], s.getLength());
+ sum += s.getLength();
+ // ensure host preserved
+ Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts());
+ }
+ Assert.assertEquals(length, sum);
+ // ensure the initial small-split counter is consumed for the two
initial small splits
+ Assert.assertEquals(0, fileSplitter.getRemainingInitialSplitNum());
+ }
+
+ @Test
+ public void testMultiBlockSplitsAndHostPreservation() throws Exception {
+ LocationPath loc =
LocationPath.of("hdfs://example.com/path/twoblocks");
+ long len = 96 * MB;
+ BlockLocation[] locations = new BlockLocation[]{
+ new BlockLocation(null, new String[]{"h1"}, 0L, 48 * MB),
+ new BlockLocation(null, new String[]{"h2"}, 48 * MB, 48 * MB)
+ };
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 0);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ len,
+ 0L,
+ true,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+ Assert.assertEquals(2, splits.size());
+ FileSplit s0 = (FileSplit) splits.get(0);
+ FileSplit s1 = (FileSplit) splits.get(1);
+ Assert.assertEquals(48 * MB, s0.getLength());
+ Assert.assertEquals(48 * MB, s1.getLength());
+ Assert.assertArrayEquals(new String[]{"h1"}, s0.getHosts());
+ Assert.assertArrayEquals(new String[]{"h2"}, s1.getHosts());
+ }
+
+ @Test
+ public void testZeroLengthBlockIsSkipped() throws Exception {
+ LocationPath loc =
LocationPath.of("hdfs://example.com/path/zeroblock");
+ long length = 10 * MB;
+ BlockLocation[] locations = new BlockLocation[]{
+ new BlockLocation(null, new String[]{"h1"}, 0L, 0L),
+ new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)
+ };
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB,
DEFAULT_INITIAL_SPLITS);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ length,
+ 0L,
+ true,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+ Assert.assertEquals(1, splits.size());
+ FileSplit s = (FileSplit) splits.get(0);
+ Assert.assertEquals(10 * MB, s.getLength());
+ Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts());
+ }
+
+ @Test
+ public void testNonSplittableFlagDecrementsCounter() throws Exception {
+ LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz");
+ BlockLocation[] locations = new BlockLocation[]{new
BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)};
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ 10 * MB,
+ 0L,
+ false,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+ Assert.assertEquals(1, splits.size());
+ }
+
+ @Test
+ public void testNullRemainingInitialSplitIsAllowed() throws Exception {
+ LocationPath loc = LocationPath.of("hdfs://example.com/path/somefile");
+ BlockLocation[] locations = new BlockLocation[]{new
BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)};
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB,
DEFAULT_INITIAL_SPLITS);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ 10 * MB,
+ 0L,
+ true,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+ Assert.assertEquals(1, splits.size());
+ }
+
+ @Test
+ public void testSmallFileNoSplit() throws Exception {
+ LocationPath loc = LocationPath.of("hdfs://example.com/path/small");
+ BlockLocation[] locations = new BlockLocation[]{new
BlockLocation(null, new String[]{"h1"}, 0L, 2 * MB)};
+ FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB,
DEFAULT_INITIAL_SPLITS);
+ List<Split> splits = fileSplitter.splitFile(
+ loc,
+ 0L,
+ locations,
+ 2 * MB,
+ 0L,
+ true,
+ Collections.emptyList(),
+ FileSplit.FileSplitCreator.DEFAULT);
+ Assert.assertEquals(1, splits.size());
+ FileSplit s = (FileSplit) splits.get(0);
+ Assert.assertEquals(2 * MB, s.getLength());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 93afa390530..692a0db12ca 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -21,6 +21,8 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.FileSplitter;
import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.SessionVariable;
@@ -92,11 +94,26 @@ public class PaimonScanNodeTest {
}
}).when(spyPaimonScanNode).getPaimonSplitFromAPI();
+ long maxInitialSplitSize = 32L * 1024L * 1024L;
+ long maxSplitSize = 64L * 1024L * 1024L;
+ // Ensure fileSplitter is initialized on the spy as doInitialize() is
not called in this unit test
+ FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize,
maxSplitSize,
+ 0);
+ try {
+ java.lang.reflect.Field field =
FileQueryScanNode.class.getDeclaredField("fileSplitter");
+ field.setAccessible(true);
+ field.set(spyPaimonScanNode, fileSplitter);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new RuntimeException("Failed to inject FileSplitter into
PaimonScanNode test", e);
+ }
+
// Note: The original PaimonSource is sufficient for this test
// No need to mock catalog properties since doInitialize() is not
called in this test
// Mock SessionVariable behavior
Mockito.when(sv.isForceJniScanner()).thenReturn(false);
Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE");
+
Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize);
+ Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize);
// native
mockNativeReader(spyPaimonScanNode);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index 30582224f76..f6e8efd5294 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -769,9 +769,9 @@ public class FederationBackendPolicyTest {
Map<Backend, List<Split>> backendListMap2 =
mergeAssignment(assignment2);
backendListMap2.forEach((k, v) -> {
if (k.getId() == 1) {
- Assert.assertEquals(900000L,
v.stream().mapToLong(Split::getLength).sum());
+ Assert.assertEquals(1000000L,
v.stream().mapToLong(Split::getLength).sum());
} else if (k.getId() == 2) {
- Assert.assertEquals(500000L,
v.stream().mapToLong(Split::getLength).sum());
+ Assert.assertEquals(400000L,
v.stream().mapToLong(Split::getLength).sum());
} else if (k.getId() == 3) {
Assert.assertEquals(1000000L,
v.stream().mapToLong(Split::getLength).sum());
}
diff --git
a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy
b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy
index 4117501eff2..44dd9104411 100644
---
a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy
+++
b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy
@@ -48,7 +48,7 @@ suite("test_hive_compress_type",
"p0,external,hive,external_docker,external_dock
sql """set file_split_size=8388608"""
explain {
sql("select count(*) from test_compress_partitioned")
- contains "inputSplitNum=82, totalFileSize=734675596, scanRanges=82"
+ contains "inputSplitNum=16, totalFileSize=734675596, scanRanges=16"
contains "partition=8/8"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]