This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpc_preview4-external in repository https://gitbox.apache.org/repos/asf/doris.git
commit bdaf820278473d0e3a58faca1761822bfd376bb3 Author: Qi Chen <[email protected]> AuthorDate: Fri Dec 19 11:45:28 2025 +0800 [opt](multi-catalog) Optimize file split size. (#59175) --- .../doris/datasource/FederationBackendPolicy.java | 12 +- .../apache/doris/datasource/FileQueryScanNode.java | 19 +- .../org/apache/doris/datasource/FileScanNode.java | 10 +- .../org/apache/doris/datasource/FileSplitter.java | 230 ++++++++++++++++++--- .../apache/doris/datasource/SplitGenerator.java | 2 +- .../doris/datasource/hive/source/HiveScanNode.java | 78 +++++-- .../datasource/iceberg/source/IcebergScanNode.java | 80 ++++++- .../datasource/paimon/source/PaimonScanNode.java | 47 ++++- .../doris/datasource/tvf/source/TVFScanNode.java | 31 ++- .../java/org/apache/doris/qe/SessionVariable.java | 39 ++++ .../apache/doris/datasource/FileSplitterTest.java | 216 +++++++++++++++++++ .../paimon/source/PaimonScanNodeTest.java | 17 ++ .../doris/planner/FederationBackendPolicyTest.java | 4 +- .../hive/test_hive_compress_type.groovy | 2 +- 14 files changed, 691 insertions(+), 96 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 813d1892642..a8927d86a94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -63,12 +63,16 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class FederationBackendPolicy { private static final Logger LOG = LogManager.getLogger(FederationBackendPolicy.class); + + private static final long FIXED_SHUFFLE_SEED = 123456789L; + protected final List<Backend> backends = Lists.newArrayList(); private final Map<String, List<Backend>> backendMap = Maps.newHashMap(); @@ -220,6 +224,7 @@ public class FederationBackendPolicy { public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException { ListMultimap<Backend, Split> assignment = ArrayListMultimap.create(); + Collections.shuffle(splits, new Random(FIXED_SHUFFLE_SEED)); List<Split> remainingSplits; List<Backend> backends = new ArrayList<>(); @@ -228,8 +233,6 @@ public class FederationBackendPolicy { } ResettableRandomizedIterator<Backend> randomCandidates = new ResettableRandomizedIterator<>(backends); - boolean splitsToBeRedistributed = false; - // optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain // locality information if (Config.split_assigner_optimized_local_scheduling) { @@ -246,7 +249,6 @@ public class FederationBackendPolicy { assignment.put(selectedBackend, split); assignedWeightPerBackend.put(selectedBackend, assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); - splitsToBeRedistributed = true; continue; } } @@ -276,7 +278,6 @@ public class FederationBackendPolicy { case CONSISTENT_HASHING: { candidateNodes = consistentHash.getNode(split, Config.split_assigner_min_consistent_hash_candidate_num); - splitsToBeRedistributed = true; break; } default: { @@ -302,7 +303,7 @@ public class FederationBackendPolicy { assignedWeightPerBackend.get(selectedBackend) + split.getSplitWeight().getRawValue()); } - if (enableSplitsRedistribution && splitsToBeRedistributed) { + if (enableSplitsRedistribution) { equateDistribution(assignment); } return assignment; @@ -499,3 +500,4 @@ public class FederationBackendPolicy { } } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 93bec2d1849..3ae32170e4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -94,6 +94,8 @@ public abstract class FileQueryScanNode extends FileScanNode { protected TableScanParams scanParams; + protected FileSplitter fileSplitter; + /** * External file scan node for Query hms table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -134,6 +136,8 @@ public abstract class FileQueryScanNode extends FileScanNode { } initBackendPolicy(); initSchemaParams(); + fileSplitter = new FileSplitter(sessionVariable.maxInitialSplitSize, sessionVariable.maxSplitSize, + sessionVariable.maxInitialSplitNum); } // Init schema (Tuple/Slot) related params. @@ -592,19 +596,4 @@ public abstract class FileQueryScanNode extends FileScanNode { } return this.scanParams; } - - /** - * The real file split size is determined by: - * 1. If user specify the split size in session variable `file_split_size`, use user specified value. - * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size. - * @param blockSize, got from file system, eg, hdfs - * @return the real file split size - */ - protected long getRealFileSplitSize(long blockSize) { - long realSplitSize = sessionVariable.getFileSplitSize(); - if (realSplitSize <= 0) { - realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize); - } - return realSplitSize; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java index c3e06999bba..a7aa0f607ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java @@ -62,9 +62,6 @@ import java.util.stream.Collectors; * Base class for External File Scan, including external query and load. */ public abstract class FileScanNode extends ExternalScanNode { - - public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB - // For explain protected long totalFileSize = 0; protected long totalPartitionNum = 0; @@ -115,12 +112,7 @@ public abstract class FileScanNode extends ExternalScanNode { } output.append(prefix); - boolean isBatch; - try { - isBatch = isBatchMode(); - } catch (UserException e) { - throw new RuntimeException(e); - } + boolean isBatch = isBatchMode(); if (isBatch) { output.append("(approximate)"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java index 33b2d70bfb1..5fe84441971 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java @@ -22,13 +22,19 @@ import org.apache.doris.common.util.Util; import org.apache.doris.spi.Split; import org.apache.doris.thrift.TFileCompressType; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.hadoop.fs.BlockLocation; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; public class FileSplitter { private static final Logger LOG = LogManager.getLogger(FileSplitter.class); @@ -40,18 +46,66 @@ public class FileSplitter { return totalFileNum < parallelism * numBackends; } - public static List<Split> splitFile( - LocationPath path, - long fileSplitSize, - BlockLocation[] blockLocations, - long length, - long modificationTime, - boolean splittable, - List<String> partitionValues, - SplitCreator splitCreator) - throws IOException { + private long maxInitialSplitSize; + + private long maxSplitSize; + + private int maxInitialSplitNum; + private final AtomicInteger remainingInitialSplitNum; + + private long currentMaxSplitSize; + + public long getMaxInitialSplitSize() { + return maxInitialSplitSize; + } + + public void setMaxInitialSplitSize(long maxInitialSplitSize) { + this.maxInitialSplitSize = maxInitialSplitSize; + } + + public long getMaxSplitSize() { + return maxSplitSize; + } + + public void setMaxSplitSize(long maxSplitSize) { + this.maxSplitSize = maxSplitSize; + } + + public int maxInitialSplitNum() { + return maxInitialSplitNum; + } + + public void setMaxInitialSplits(int maxInitialSplitNum) { + this.maxInitialSplitNum = maxInitialSplitNum; + } + + public long getRemainingInitialSplitNum() { + return remainingInitialSplitNum.get(); + } + + public FileSplitter(long maxInitialSplitSize, long maxSplitSize, int maxInitialSplitNum) { + this.maxInitialSplitSize = maxInitialSplitSize; + this.maxSplitSize = maxSplitSize; + this.maxInitialSplitNum = maxInitialSplitNum; + currentMaxSplitSize = maxInitialSplitSize; + remainingInitialSplitNum = new AtomicInteger(maxInitialSplitNum); + } + + public List<Split> splitFile( + LocationPath path, + long specifiedFileSplitSize, + BlockLocation[] blockLocations, + long length, + long modificationTime, + boolean splittable, + List<String> partitionValues, + SplitCreator splitCreator) + throws IOException { + // Pass splitCreator.create() to set target file split size to calculate split weight. + long targetFileSplitSize = specifiedFileSplitSize > 0 ? specifiedFileSplitSize : maxSplitSize; if (blockLocations == null) { - blockLocations = new BlockLocation[0]; + blockLocations = new BlockLocation[1]; + blockLocations[0] = new BlockLocation(null, null, 0L, length); } List<Split> result = Lists.newArrayList(); TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.getNormalizedLocation()); @@ -60,23 +114,83 @@ public class FileSplitter { LOG.debug("Path {} is not splittable.", path); } String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); - result.add(splitCreator.create(path, 0, length, length, fileSplitSize, + result.add(splitCreator.create(path, 0, length, length, + targetFileSplitSize, modificationTime, hosts, partitionValues)); + updateCurrentMaxSplitSize(); + return result; + } + + // if specified split size is not zero, split file by specified size + if (specifiedFileSplitSize > 0) { + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) specifiedFileSplitSize > 1.1D; + bytesRemaining -= specifiedFileSplitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, specifiedFileSplitSize, + length, specifiedFileSplitSize, modificationTime, hosts, partitionValues)); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, + length, specifiedFileSplitSize, modificationTime, hosts, partitionValues)); + } return result; } - long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; - bytesRemaining -= fileSplitSize) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize, - length, fileSplitSize, modificationTime, hosts, partitionValues)); + + // split file by block + long start = 0; + ImmutableList.Builder<InternalBlock> blockBuilder = ImmutableList.builder(); + for (BlockLocation blockLocation : blockLocations) { + // clamp the block range + long blockStart = Math.max(start, blockLocation.getOffset()); + long blockEnd = Math.min(start + length, blockLocation.getOffset() + blockLocation.getLength()); + if (blockStart > blockEnd) { + // block is outside split range + continue; + } + if (blockStart == blockEnd && !(blockStart == start && blockEnd == start + length)) { + // skip zero-width block, except in the special circumstance: + // slice is empty, and the block covers the empty slice interval. + continue; + } + blockBuilder.add(new InternalBlock(blockStart, blockEnd, blockLocation.getHosts())); + } + List<InternalBlock> blocks = blockBuilder.build(); + if (blocks.isEmpty()) { + result.add(splitCreator.create(path, 0, length, length, + targetFileSplitSize, modificationTime, null, + partitionValues)); + updateCurrentMaxSplitSize(); + return result; } - if (bytesRemaining != 0L) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, - length, fileSplitSize, modificationTime, hosts, partitionValues)); + + long splitStart = start; + int currentBlockIdx = 0; + while (splitStart < start + length) { + updateCurrentMaxSplitSize(); + long splitBytes; + long remainingBlockBytes = blocks.get(currentBlockIdx).getEnd() - splitStart; + if (remainingBlockBytes <= currentMaxSplitSize) { + splitBytes = remainingBlockBytes; + } else if (currentMaxSplitSize * 2 >= remainingBlockBytes) { + // Second to last split in this block, generate two evenly sized splits + splitBytes = remainingBlockBytes / 2; + } else { + splitBytes = currentMaxSplitSize; + } + result.add(splitCreator.create(path, splitStart, splitBytes, + length, targetFileSplitSize, modificationTime, blocks.get(currentBlockIdx).getHosts(), + partitionValues)); + splitStart += splitBytes; + if (splitStart == blocks.get(currentBlockIdx).getEnd()) { + currentBlockIdx++; + if (currentBlockIdx != blocks.size()) { + Verify.verify(splitStart == blocks.get(currentBlockIdx).getStart()); + } + } } if (LOG.isDebugEnabled()) { @@ -85,7 +199,19 @@ public class FileSplitter { return result; } - private static int getBlockIndex(BlockLocation[] blkLocations, long offset) { + private void updateCurrentMaxSplitSize() { + currentMaxSplitSize = maxSplitSize; + int cur = remainingInitialSplitNum.get(); + while (cur > 0) { + if (remainingInitialSplitNum.compareAndSet(cur, cur - 1)) { + currentMaxSplitSize = maxInitialSplitSize; + break; + } + cur = remainingInitialSplitNum.get(); + } + } + + private int getBlockIndex(BlockLocation[] blkLocations, long offset) { if (blkLocations == null || blkLocations.length == 0) { return -1; } @@ -100,5 +226,59 @@ public class FileSplitter { throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength)); } + private static class InternalBlock { + private final long start; + private final long end; + private final String[] hosts; + + public InternalBlock(long start, long end, String[] hosts) { + Preconditions.checkArgument(start <= end, "block end cannot be before block start"); + this.start = start; + this.end = end; + this.hosts = hosts; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public String[] getHosts() { + return hosts; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + InternalBlock that = (InternalBlock) o; + return start == that.start && end == that.end && Arrays.equals(hosts, that.hosts); + } + + @Override + public int hashCode() { + int result = Objects.hash(start, end); + result = 31 * result + Arrays.hashCode(hosts); + return result; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("InternalBlock{"); + sb.append("start=").append(start); + sb.append(", end=").append(end); + sb.append(", hosts=").append(Arrays.toString(hosts)); + sb.append('}'); + return sb.toString(); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java index 0b8a1022d5a..391552a5106 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java @@ -40,7 +40,7 @@ public interface SplitGenerator { /** * Whether the producer(e.g. ScanNode) support batch mode. */ - default boolean isBatchMode() throws UserException { + default boolean isBatchMode() { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 744423f622c..5bcf2f5546a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -186,7 +186,7 @@ public class HiveScanNode extends FileQueryScanNode { .getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog()); String bindBrokerName = hmsTable.getCatalog().bindBrokerName(); List<Split> allFiles = Lists.newArrayList(); - getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName, numBackends); + getFileSplitByPartitions(cache, prunedPartitions, allFiles, bindBrokerName, numBackends, false); if (ConnectContext.get().getExecutor() != null) { ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime(); } @@ -226,7 +226,8 @@ public class HiveScanNode extends FileQueryScanNode { try { List<Split> allFiles = Lists.newArrayList(); getFileSplitByPartitions( - cache, Collections.singletonList(partition), allFiles, bindBrokerName, numBackends); + cache, Collections.singletonList(partition), allFiles, bindBrokerName, + numBackends, true); if (allFiles.size() > numSplitsPerPartition.get()) { numSplitsPerPartition.set(allFiles.size()); } @@ -277,7 +278,8 @@ public class HiveScanNode extends FileQueryScanNode { } private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartition> partitions, - List<Split> allFiles, String bindBrokerName, int numBackends) throws IOException, UserException { + List<Split> allFiles, String bindBrokerName, int numBackends, + boolean isBatchMode) throws IOException, UserException { List<FileCacheValue> fileCaches; if (hiveTransaction != null) { try { @@ -293,9 +295,11 @@ public class HiveScanNode extends FileQueryScanNode { fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, directoryLister, hmsTable); } + + long targetFileSplitSize = determineTargetFileSplitSize(fileCaches, isBatchMode); if (tableSample != null) { List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = selectFiles(fileCaches); - splitAllFiles(allFiles, hiveFileStatuses); + splitAllFiles(allFiles, hiveFileStatuses, targetFileSplitSize); return; } @@ -319,27 +323,67 @@ public class HiveScanNode extends FileQueryScanNode { int parallelNum = sessionVariable.getParallelExecInstanceNum(); needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum); } + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { - if (fileCacheValue.getFiles() != null) { - boolean isSplittable = fileCacheValue.isSplittable(); - for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { - allFiles.addAll(FileSplitter.splitFile(status.getPath(), - // set block size to Long.MAX_VALUE to avoid splitting the file. - getRealFileSplitSize(needSplit ? status.getBlockSize() : Long.MAX_VALUE), - status.getBlockLocations(), status.getLength(), status.getModificationTime(), - isSplittable, fileCacheValue.getPartitionValues(), - new HiveSplitCreator(fileCacheValue.getAcidInfo()))); + if (fileCacheValue.getFiles() == null) { + continue; + } + boolean isSplittable = fileCacheValue.isSplittable(); + + for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { + allFiles.addAll(fileSplitter.splitFile( + status.getPath(), + targetFileSplitSize, + status.getBlockLocations(), + status.getLength(), + status.getModificationTime(), + isSplittable && needSplit, + fileCacheValue.getPartitionValues(), + new HiveSplitCreator(fileCacheValue.getAcidInfo()))); + } + } + } + + private long determineTargetFileSplitSize(List<FileCacheValue> fileCaches, + boolean isBatchMode) { + if (sessionVariable.getFileSplitSize() > 0) { + return sessionVariable.getFileSplitSize(); + } + /** Hive batch split mode will return 0. and <code>FileSplitter</code> + * will determine file split size. + */ + if (isBatchMode) { + return 0; + } + long result = sessionVariable.getMaxInitialSplitSize(); + long totalFileSize = 0; + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { + if (fileCacheValue.getFiles() == null) { + continue; + } + for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { + totalFileSize += status.getLength(); + if (totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); + break; } } } + return result; } private void splitAllFiles(List<Split> allFiles, - List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses) throws IOException { + List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses, + long realFileSplitSize) throws IOException { for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) { - allFiles.addAll(FileSplitter.splitFile(status.getPath(), getRealFileSplitSize(status.getBlockSize()), - status.getBlockLocations(), status.getLength(), status.getModificationTime(), - status.isSplittable(), status.getPartitionValues(), + allFiles.addAll(fileSplitter.splitFile( + status.getPath(), + realFileSplitSize, + status.getBlockLocations(), + status.getLength(), + status.getModificationTime(), + status.isSplittable(), + status.getPartitionValues(), new HiveSplitCreator(status.getAcidInfo()))); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java index 0ffe86edb31..133ac067644 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java @@ -85,6 +85,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -119,7 +120,7 @@ public class IcebergScanNode extends FileQueryScanNode { private boolean tableLevelPushDownCount = false; private long countFromSnapshot; private static final long COUNT_WITH_PARALLEL_SPLITS = 10000; - private long targetSplitSize; + private long targetSplitSize = 0; // This is used to avoid repeatedly calculating partition info map for the same partition data. private Map<PartitionData, Map<String, String>> partitionMapInfos; private boolean isPartitionedTable; @@ -131,6 +132,8 @@ public class IcebergScanNode extends FileQueryScanNode { private Map<StorageProperties.Type, StorageProperties> storagePropertiesMap; private Map<String, String> backendStorageProperties; + private Boolean isBatchMode = null; + // for test @VisibleForTesting public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) { @@ -171,7 +174,6 @@ public class IcebergScanNode extends FileQueryScanNode { @Override protected void doInitialize() throws UserException { icebergTable = source.getIcebergTable(); - targetSplitSize = getRealFileSplitSize(0); partitionMapInfos = new HashMap<>(); isPartitionedTable = icebergTable.spec().isPartitioned(); formatVersion = ((BaseTable) icebergTable).operations().current().formatVersion(); @@ -375,18 +377,57 @@ public class IcebergScanNode extends FileQueryScanNode { private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { if (!Config.iceberg_manifest_cache_enable) { - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } try { return planFileScanTaskWithManifestCache(scan); } catch (Exception e) { LOG.warn("Plan with manifest cache failed, fallback to original scan: {}", e.getMessage()); - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } } + private CloseableIterable<FileScanTask> splitFiles(TableScan scan) { + if (sessionVariable.getFileSplitSize() > 0) { + return TableScanUtil.splitFiles(scan.planFiles(), + sessionVariable.getFileSplitSize()); + } + if (isBatchMode()) { + // Currently iceberg batch split mode will use max split size. + // TODO: dynamic split size in batch split mode need to customize iceberg splitter. + return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize()); + } + + // Non Batch Mode + // Materialize planFiles() into a list to avoid iterating the CloseableIterable twice. + // RISK: It will cost memory if the table is large. + List<FileScanTask> fileScanTaskList = new ArrayList<>(); + try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) { + for (FileScanTask task : scanTasksIter) { + fileScanTaskList.add(task); + } + } catch (Exception e) { + throw new RuntimeException("Failed to materialize file scan tasks", e); + } + + targetSplitSize = determineTargetFileSplitSize(fileScanTaskList); + return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize); + } + + private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) { + long result = sessionVariable.getMaxInitialSplitSize(); + long accumulatedTotalFileSize = 0; + for (FileScanTask task : tasks) { + accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file()); + if (accumulatedTotalFileSize + >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); + break; + } + } + return result; + } + private CloseableIterable<FileScanTask> planFileScanTaskWithManifestCache(TableScan scan) throws IOException { // Get the snapshot from the scan; return empty if no snapshot exists Snapshot snapshot = scan.snapshot(); @@ -502,7 +543,7 @@ public class IcebergScanNode extends FileQueryScanNode { } // Split tasks into smaller chunks based on target split size for parallel processing - long targetSplitSize = getRealFileSplitSize(0); + targetSplitSize = determineTargetFileSplitSize(tasks); return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks), targetSplitSize); } @@ -592,21 +633,36 @@ public class IcebergScanNode extends FileQueryScanNode { } @Override - public boolean isBatchMode() throws UserException { + public boolean isBatchMode() { + Boolean cached = isBatchMode; + if (cached != null) { + return cached; + } TPushAggOp aggOp = getPushDownAggNoGroupingOp(); if (aggOp.equals(TPushAggOp.COUNT)) { - countFromSnapshot = getCountFromSnapshot(); + try { + countFromSnapshot = getCountFromSnapshot(); + } catch (UserException e) { + throw new RuntimeException(e); + } if (countFromSnapshot >= 0) { tableLevelPushDownCount = true; + isBatchMode = false; return false; } } - if (createTableScan().snapshot() == null) { - return false; + try { + if (createTableScan().snapshot() == null) { + isBatchMode = false; + return false; + } + } catch (UserException e) { + throw new RuntimeException(e); } if (!sessionVariable.getEnableExternalTableBatchMode()) { + isBatchMode = false; return false; } @@ -622,10 +678,12 @@ public class IcebergScanNode extends FileQueryScanNode { ManifestFile next = matchingManifest.next(); cnt += next.addedFilesCount() + next.existingFilesCount(); if (cnt >= sessionVariable.getNumFilesInBatchMode()) { + isBatchMode = true; return true; } } } + isBatchMode = false; return false; }); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 402bf3d0ef6..1671ce0f17a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -28,7 +28,6 @@ import org.apache.doris.common.util.FileFormatUtils; import org.apache.doris.common.util.LocationPath; import org.apache.doris.datasource.ExternalUtil; import org.apache.doris.datasource.FileQueryScanNode; -import org.apache.doris.datasource.FileSplitter; import org.apache.doris.datasource.credentials.CredentialUtils; import org.apache.doris.datasource.credentials.VendedCredentialsFactory; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; @@ -293,7 +292,8 @@ public class PaimonScanNode extends FileQueryScanNode { // And for counting the number of selected partitions for this paimon table. Map<BinaryRow, Map<String, String>> partitionInfoMaps = new HashMap<>(); // if applyCountPushdown is true, we can't split the DataSplit - long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0); + boolean hasDeterminedTargetFileSplitSize = false; + long targetFileSplitSize = 0; for (DataSplit dataSplit : dataSplits) { SplitStat splitStat = new SplitStat(); splitStat.setRowCount(dataSplit.rowCount()); @@ -325,6 +325,10 @@ public class PaimonScanNode extends FileQueryScanNode { if (ignoreSplitType == SessionVariable.IgnoreSplitType.IGNORE_NATIVE) { continue; } + if (!hasDeterminedTargetFileSplitSize) { + targetFileSplitSize = determineTargetFileSplitSize(dataSplits, isBatchMode()); + hasDeterminedTargetFileSplitSize = true; + } splitStat.setType(SplitReadType.NATIVE); splitStat.setRawFileConvertable(true); List<RawFile> rawFiles = optRawFiles.get(); @@ -332,13 +336,13 @@ public class PaimonScanNode extends FileQueryScanNode { RawFile file = rawFiles.get(i); LocationPath locationPath = LocationPath.of(file.path(), storagePropertiesMap); try { - List<Split> dorisSplits = FileSplitter.splitFile( + List<Split> dorisSplits = fileSplitter.splitFile( locationPath, - realFileSplitSize, + targetFileSplitSize, null, file.length(), -1, - true, + !applyCountPushdown, null, PaimonSplit.PaimonSplitCreator.DEFAULT); for (Split dorisSplit : dorisSplits) { @@ -383,12 +387,43 @@ public class PaimonScanNode extends FileQueryScanNode { // We need to set the target size for all splits so that we can calculate the // proportion of each split later. - splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); + splits.forEach(s -> s.setTargetSplitSize(sessionVariable.getFileSplitSize() > 0 + ? sessionVariable.getFileSplitSize() : sessionVariable.getMaxSplitSize())); this.selectedPartitionNum = partitionInfoMaps.size(); return splits; } + private long determineTargetFileSplitSize(List<DataSplit> dataSplits, + boolean isBatchMode) { + if (sessionVariable.getFileSplitSize() > 0) { + return sessionVariable.getFileSplitSize(); + } + /** Paimon batch split mode will return 0. and <code>FileSplitter</code> + * will determine file split size. + */ + if (isBatchMode) { + return 0; + } + long result = sessionVariable.getMaxInitialSplitSize(); + long totalFileSize = 0; + for (DataSplit dataSplit : dataSplits) { + Optional<List<RawFile>> rawFiles = dataSplit.convertToRawFiles(); + if (!supportNativeReader(rawFiles)) { + continue; + } + for (RawFile rawFile : rawFiles.get()) { + totalFileSize += rawFile.fileSize(); + if (totalFileSize + >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); + break; + } + } + } + return result; + } + @VisibleForTesting public Map<String, String> getIncrReadParams() throws UserException { Map<String, String> paimonScanParams = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index e7567559762..c3b0e3e8b6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -146,12 +146,18 @@ public class TVFScanNode extends FileQueryScanNode { needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum); } + long targetFileSplitSize = determineTargetFileSplitSize(fileStatuses); + for (TBrokerFileStatus fileStatus : fileStatuses) { try { - splits.addAll(FileSplitter.splitFile(LocationPath.of(fileStatus.getPath()), - getRealFileSplitSize(needSplit ? fileStatus.getBlockSize() : Long.MAX_VALUE), - null, fileStatus.getSize(), - fileStatus.getModificationTime(), fileStatus.isSplitable, null, + splits.addAll(fileSplitter.splitFile( + LocationPath.of(fileStatus.getPath()), + targetFileSplitSize, + null, + fileStatus.getSize(), + fileStatus.getModificationTime(), + fileStatus.isSplitable && needSplit, + null, FileSplitCreator.DEFAULT)); } catch (IOException e) { LOG.warn("get file split failed for TVF: {}", fileStatus.getPath(), e); @@ -161,6 +167,23 @@ public class TVFScanNode extends FileQueryScanNode { return splits; } + private long determineTargetFileSplitSize(List<TBrokerFileStatus> fileStatuses) { + if (sessionVariable.getFileSplitSize() > 0) { + return sessionVariable.getFileSplitSize(); + } + long result = sessionVariable.getMaxInitialSplitSize(); + long totalFileSize = 0; + for (TBrokerFileStatus fileStatus : fileStatuses) { + totalFileSize += fileStatus.getSize(); + if (totalFileSize + >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); + break; + } + } + return result; + } + @Override protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { if (split instanceof FileSplit) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 4fddfd0332d..14a25a992b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -513,6 +513,12 @@ public class SessionVariable implements Serializable, Writable { // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3. public static final String FILE_SPLIT_SIZE = "file_split_size"; + public static final String MAX_INITIAL_FILE_SPLIT_SIZE = "max_initial_file_split_size"; + + public static final String MAX_FILE_SPLIT_SIZE = "max_file_split_size"; + + public static final String MAX_INITIAL_FILE_SPLIT_NUM = "max_initial_file_split_num"; + // Target file size in bytes for Iceberg write operations public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES = "iceberg_write_target_file_size_bytes"; @@ -2161,6 +2167,15 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true) public long fileSplitSize = 0; + @VariableMgr.VarAttr(name = MAX_INITIAL_FILE_SPLIT_SIZE, needForward = true) + public long maxInitialSplitSize = 32L * 1024L * 1024L; + + @VariableMgr.VarAttr(name = MAX_FILE_SPLIT_SIZE, needForward = true) + public long maxSplitSize = 64L * 1024L * 1024L; + + @VariableMgr.VarAttr(name = MAX_INITIAL_FILE_SPLIT_NUM, needForward = true) + public int maxInitialSplitNum = 200; + // Target file size for Iceberg write operations // Default 0 means use config::iceberg_sink_max_file_size @VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES, needForward = true) @@ -4181,6 +4196,30 @@ public class SessionVariable implements Serializable, Writable { this.fileSplitSize = fileSplitSize; } + public long getMaxInitialSplitSize() { + return maxInitialSplitSize; + } + + public void setMaxInitialSplitSize(long maxInitialSplitSize) { + this.maxInitialSplitSize = maxInitialSplitSize; + } + + public long getMaxSplitSize() { + return maxSplitSize; + } + + public void setMaxSplitSize(long maxSplitSize) { + this.maxSplitSize = maxSplitSize; + } + + public int getMaxInitialSplitNum() { + return maxInitialSplitNum; + } + + public void setMaxInitialSplitNum(int maxInitialSplitNum) { + this.maxInitialSplitNum = maxInitialSplitNum; + } + public long getIcebergWriteTargetFileSizeBytes() { return icebergWriteTargetFileSizeBytes; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java new file mode 100644 index 00000000000..a455923da4d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.spi.Split; + +import org.apache.hadoop.fs.BlockLocation; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class FileSplitterTest { + + private static final long MB = 1024L * 1024L; + + private static final int DEFAULT_INITIAL_SPLITS = 200; + + @Test + public void testNonSplittableCompressedFileProducesSingleSplit() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + Split s = splits.get(0); + Assert.assertEquals(10 * MB, ((org.apache.doris.datasource.FileSplit) s).getLength()); + // host should be preserved + Assert.assertArrayEquals(new String[]{"h1"}, ((org.apache.doris.datasource.FileSplit) s).getHosts()); + Assert.assertEquals(DEFAULT_INITIAL_SPLITS - 1, fileSplitter.getRemainingInitialSplitNum()); + } + + @Test + public void testEmptyBlockLocationsProducesSingleSplitAndNullHosts() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file"); + BlockLocation[] locations = new BlockLocation[0]; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 5 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + org.apache.doris.datasource.FileSplit s = (org.apache.doris.datasource.FileSplit) splits.get(0); + Assert.assertEquals(5 * MB, s.getLength()); + // hosts should be empty array when passing null + Assert.assertNotNull(s.getHosts()); + Assert.assertEquals(0, s.getHosts().length); + } + + @Test + public void testSplittableSingleBigBlockProducesExpectedSplitsWithInitialSmallChunks() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/bigfile"); + long length = 200 * MB; + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, length)}; + // set maxInitialSplits to 2 to force the first two splits to be small. + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + length, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + + // expect splits sizes: 32MB, 32MB, 64MB, 36MB, 36MB -> sum is 200MB + long[] expected = new long[]{32 * MB, 32 * MB, 64 * MB, 36 * MB, 36 * MB}; + Assert.assertEquals(expected.length, splits.size()); + long sum = 0L; + for (int i = 0; i < expected.length; i++) { + org.apache.doris.datasource.FileSplit s = (org.apache.doris.datasource.FileSplit) splits.get(i); + Assert.assertEquals(expected[i], s.getLength()); + sum += s.getLength(); + // ensure host preserved + Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts()); + } + Assert.assertEquals(length, sum); + // ensure the initial small-split counter is consumed for the two initial small splits + Assert.assertEquals(0, fileSplitter.getRemainingInitialSplitNum()); + } + + @Test + public void testMultiBlockSplitsAndHostPreservation() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/twoblocks"); + long len = 96 * MB; + BlockLocation[] locations = new BlockLocation[]{ + new BlockLocation(null, new String[]{"h1"}, 0L, 48 * MB), + new BlockLocation(null, new String[]{"h2"}, 48 * MB, 48 * MB) + }; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 0); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + len, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(2, splits.size()); + FileSplit s0 = (FileSplit) splits.get(0); + FileSplit s1 = (FileSplit) splits.get(1); + Assert.assertEquals(48 * MB, s0.getLength()); + Assert.assertEquals(48 * MB, s1.getLength()); + Assert.assertArrayEquals(new String[]{"h1"}, s0.getHosts()); + Assert.assertArrayEquals(new String[]{"h2"}, s1.getHosts()); + } + + @Test + public void testZeroLengthBlockIsSkipped() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/zeroblock"); + long length = 10 * MB; + BlockLocation[] locations = new BlockLocation[]{ + new BlockLocation(null, new String[]{"h1"}, 0L, 0L), + new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB) + }; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + length, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + FileSplit s = (FileSplit) splits.get(0); + Assert.assertEquals(10 * MB, s.getLength()); + Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts()); + } + + @Test + public void testNonSplittableFlagDecrementsCounter() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + false, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + } + + @Test + public void testNullRemainingInitialSplitIsAllowed() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/somefile"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + } + + @Test + public void testSmallFileNoSplit() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/small"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 2 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 2 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + FileSplit s = (FileSplit) splits.get(0); + Assert.assertEquals(2 * MB, s.getLength()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java index 93afa390530..692a0db12ca 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -21,6 +21,8 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.FileSplitter; import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.SessionVariable; @@ -92,11 +94,26 @@ public class PaimonScanNodeTest { } }).when(spyPaimonScanNode).getPaimonSplitFromAPI(); + long maxInitialSplitSize = 32L * 1024L * 1024L; + long maxSplitSize = 64L * 1024L * 1024L; + // Ensure fileSplitter is initialized on the spy as doInitialize() is not called in this unit test + FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize, maxSplitSize, + 0); + try { + java.lang.reflect.Field field = FileQueryScanNode.class.getDeclaredField("fileSplitter"); + field.setAccessible(true); + field.set(spyPaimonScanNode, fileSplitter); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Failed to inject FileSplitter into PaimonScanNode test", e); + } + // Note: The original PaimonSource is sufficient for this test // No need to mock catalog properties since doInitialize() is not called in this test // Mock SessionVariable behavior Mockito.when(sv.isForceJniScanner()).thenReturn(false); Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE"); + Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize); + Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize); // native mockNativeReader(spyPaimonScanNode); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index 30582224f76..f6e8efd5294 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -769,9 +769,9 @@ public class FederationBackendPolicyTest { Map<Backend, List<Split>> backendListMap2 = mergeAssignment(assignment2); backendListMap2.forEach((k, v) -> { if (k.getId() == 1) { - Assert.assertEquals(900000L, v.stream().mapToLong(Split::getLength).sum()); + Assert.assertEquals(1000000L, v.stream().mapToLong(Split::getLength).sum()); } else if (k.getId() == 2) { - Assert.assertEquals(500000L, v.stream().mapToLong(Split::getLength).sum()); + Assert.assertEquals(400000L, v.stream().mapToLong(Split::getLength).sum()); } else if (k.getId() == 3) { Assert.assertEquals(1000000L, v.stream().mapToLong(Split::getLength).sum()); } diff --git a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy index 4117501eff2..44dd9104411 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy @@ -48,7 +48,7 @@ suite("test_hive_compress_type", "p0,external,hive,external_docker,external_dock sql """set file_split_size=8388608""" explain { sql("select count(*) from test_compress_partitioned") - contains "inputSplitNum=82, totalFileSize=734675596, scanRanges=82" + contains "inputSplitNum=16, totalFileSize=734675596, scanRanges=16" contains "partition=8/8" } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
