This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c9b1819864e [opt](multi-catalog) Optimize file split size. (#58858)
c9b1819864e is described below

commit c9b1819864ee614a098250709b8c43adfb5af649
Author: Qi Chen <[email protected]>
AuthorDate: Mon Jan 5 21:32:08 2026 +0800

    [opt](multi-catalog) Optimize file split size. (#58858)
    
    ### What problem does this PR solve?
    
    ### Release note
    
    This PR introduces a **dynamic and progressive file split size
    adjustment mechanism** to improve scan parallelism and resource
    utilization for external table scans, while avoiding excessive small
    splits or inefficiently large initial splits.
    
    #### 1. Split Size Adjustment Strategy
    
    ##### 1.1 Non-Batch Split Mode
    
    In non-batch split mode, a **two-phase split size selection strategy**
    is applied based on the total size of all input files:
    
    * The total size of all splits is calculated in advance.
    * If the total size **exceeds `maxInitialSplitNum *
    maxInitialSplitSize`**:
    
      * `split_size = maxSplitSize` (default **64MB**)
    * Otherwise:
    
      * `split_size = maxInitialSplitSize` (default **32MB**)
    
    This strategy reduces the number of splits for small datasets while
    improving parallelism for large-scale scans.
    
    ---
    
    ##### 1.2 Batch Split Mode
    
    In batch split mode, a **progressive split size adjustment strategy** is
    introduced:
    
    * As the total file size increases,
    * When the number of files gradually **exceeds `maxInitialSplitNum`**,
    * The `split_size` is **smoothly increased from `maxInitialSplitSize`
    (32MB) toward `maxSplitSize` (64MB)**.
    
    This approach avoids generating too many small splits at the early stage
    while gradually increasing scan parallelism as the workload grows,
    resulting in more stable scheduling and execution behavior.
    
    ---
    
    ##### 1.3 User-Specified Split Size (Backward Compatibility)
    
    This PR **preserves the session variable `file_split_size`** for
    user-defined split size configuration:
    
    * If `file_split_size` is explicitly set by the user:
    
      * The user-defined value takes precedence.
      * The dynamic split size adjustment logic is bypassed.
    * This ensures full backward compatibility with existing configurations
    and tuning practices.
    
    ---
    
    #### 2. Support Status by Data Source
    
    | Data Source | Non-Batch Split Mode | Batch Split Mode | Notes |
    | ----------- | -------------------- | ---------------- |
    ----------------------------------------------------- |
    | Hive | ✅ Supported | ✅ Supported | Uses Doris internal HDFS
    FileSplitter |
    | Iceberg | ✅ Supported | ❌ Not supported | File splitting is currently
    delegated to Iceberg APIs |
    | Paimon | ✅ Supported | ❌ Not supported | Only non-batch split mode is
    implemented |
    
    ---
    
    #### 3. New Hive HDFS FileSplitter Logic
    
    For Hive HDFS files, this PR introduces an enhanced file splitting
    strategy:
    
    1. **Splits never span multiple HDFS blocks**
    
       * Prevents cross-block reads and avoids unnecessary IO overhead.
    
    2. **Tail split optimization**
    
       * If the remaining file size is smaller than `split_size * 2`,
       * The remaining part is **evenly divided** into splits,
    * Preventing the creation of very small tail splits and improving
    overall scan efficiency.
    
    ---
    
    #### Summary
    
    * Introduces dynamic and progressive split size adjustment
    * Supports both batch and non-batch split modes
    * Preserves user-defined split size configuration for backward
    compatibility
    * Optimizes Hive HDFS file splitting to reduce small tail splits and
    cross-block IO
---
 .../doris/datasource/FederationBackendPolicy.java  |  12 +-
 .../apache/doris/datasource/FileQueryScanNode.java |  19 +-
 .../org/apache/doris/datasource/FileScanNode.java  |  10 +-
 .../org/apache/doris/datasource/FileSplitter.java  | 230 ++++++++++++++++++---
 .../apache/doris/datasource/SplitGenerator.java    |   2 +-
 .../doris/datasource/hive/source/HiveScanNode.java |  78 +++++--
 .../datasource/iceberg/source/IcebergScanNode.java |  80 ++++++-
 .../datasource/paimon/source/PaimonScanNode.java   |  47 ++++-
 .../doris/datasource/tvf/source/TVFScanNode.java   |  31 ++-
 .../java/org/apache/doris/qe/SessionVariable.java  |  60 ++++++
 .../apache/doris/datasource/FileSplitterTest.java  | 216 +++++++++++++++++++
 .../paimon/source/PaimonScanNodeTest.java          |  17 ++
 .../doris/planner/FederationBackendPolicyTest.java |   4 +-
 .../hive/test_hive_compress_type.groovy            |   2 +-
 14 files changed, 712 insertions(+), 96 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
index 813d1892642..a8927d86a94 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java
@@ -63,12 +63,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 public class FederationBackendPolicy {
     private static final Logger LOG = 
LogManager.getLogger(FederationBackendPolicy.class);
+
+    private static final long FIXED_SHUFFLE_SEED = 123456789L;
+
     protected final List<Backend> backends = Lists.newArrayList();
     private final Map<String, List<Backend>> backendMap = Maps.newHashMap();
 
@@ -220,6 +224,7 @@ public class FederationBackendPolicy {
     public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> 
splits) throws UserException {
         ListMultimap<Backend, Split> assignment = ArrayListMultimap.create();
 
+        Collections.shuffle(splits, new Random(FIXED_SHUFFLE_SEED));
         List<Split> remainingSplits;
 
         List<Backend> backends = new ArrayList<>();
@@ -228,8 +233,6 @@ public class FederationBackendPolicy {
         }
         ResettableRandomizedIterator<Backend> randomCandidates = new 
ResettableRandomizedIterator<>(backends);
 
-        boolean splitsToBeRedistributed = false;
-
         // optimizedLocalScheduling enables prioritized assignment of splits 
to local nodes when splits contain
         // locality information
         if (Config.split_assigner_optimized_local_scheduling) {
@@ -246,7 +249,6 @@ public class FederationBackendPolicy {
                         assignment.put(selectedBackend, split);
                         assignedWeightPerBackend.put(selectedBackend,
                                 assignedWeightPerBackend.get(selectedBackend) 
+ split.getSplitWeight().getRawValue());
-                        splitsToBeRedistributed = true;
                         continue;
                     }
                 }
@@ -276,7 +278,6 @@ public class FederationBackendPolicy {
                     case CONSISTENT_HASHING: {
                         candidateNodes = consistentHash.getNode(split,
                                 
Config.split_assigner_min_consistent_hash_candidate_num);
-                        splitsToBeRedistributed = true;
                         break;
                     }
                     default: {
@@ -302,7 +303,7 @@ public class FederationBackendPolicy {
                     assignedWeightPerBackend.get(selectedBackend) + 
split.getSplitWeight().getRawValue());
         }
 
-        if (enableSplitsRedistribution && splitsToBeRedistributed) {
+        if (enableSplitsRedistribution) {
             equateDistribution(assignment);
         }
         return assignment;
@@ -499,3 +500,4 @@ public class FederationBackendPolicy {
         }
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 474e08922a7..2614006b6a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -94,6 +94,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
 
     protected TableScanParams scanParams;
 
+    protected FileSplitter fileSplitter;
+
     /**
      * External file scan node for Query hms table
      * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
@@ -134,6 +136,8 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         }
         initBackendPolicy();
         initSchemaParams();
+        fileSplitter = new FileSplitter(sessionVariable.maxInitialSplitSize, 
sessionVariable.maxSplitSize,
+                sessionVariable.maxInitialSplitNum);
     }
 
     // Init schema (Tuple/Slot) related params.
@@ -618,19 +622,4 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         }
         return this.scanParams;
     }
-
-    /**
-     * The real file split size is determined by:
-     * 1. If user specify the split size in session variable 
`file_split_size`, use user specified value.
-     * 2. Otherwise, use the max value of DEFAULT_SPLIT_SIZE and block size.
-     * @param blockSize, got from file system, eg, hdfs
-     * @return the real file split size
-     */
-    protected long getRealFileSplitSize(long blockSize) {
-        long realSplitSize = sessionVariable.getFileSplitSize();
-        if (realSplitSize <= 0) {
-            realSplitSize = Math.max(DEFAULT_SPLIT_SIZE, blockSize);
-        }
-        return realSplitSize;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index c3e06999bba..a7aa0f607ac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -62,9 +62,6 @@ import java.util.stream.Collectors;
  * Base class for External File Scan, including external query and load.
  */
 public abstract class FileScanNode extends ExternalScanNode {
-
-    public static final long DEFAULT_SPLIT_SIZE = 64 * 1024 * 1024; // 64MB
-
     // For explain
     protected long totalFileSize = 0;
     protected long totalPartitionNum = 0;
@@ -115,12 +112,7 @@ public abstract class FileScanNode extends 
ExternalScanNode {
         }
 
         output.append(prefix);
-        boolean isBatch;
-        try {
-            isBatch = isBatchMode();
-        } catch (UserException e) {
-            throw new RuntimeException(e);
-        }
+        boolean isBatch = isBatchMode();
         if (isBatch) {
             output.append("(approximate)");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
index 33b2d70bfb1..5fe84441971 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java
@@ -22,13 +22,19 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.spi.Split;
 import org.apache.doris.thrift.TFileCompressType;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class FileSplitter {
     private static final Logger LOG = LogManager.getLogger(FileSplitter.class);
@@ -40,18 +46,66 @@ public class FileSplitter {
         return totalFileNum < parallelism * numBackends;
     }
 
-    public static List<Split> splitFile(
-            LocationPath path,
-            long fileSplitSize,
-            BlockLocation[] blockLocations,
-            long length,
-            long modificationTime,
-            boolean splittable,
-            List<String> partitionValues,
-            SplitCreator splitCreator)
-            throws IOException {
+    private long maxInitialSplitSize;
+
+    private long maxSplitSize;
+
+    private int maxInitialSplitNum;
+    private final AtomicInteger remainingInitialSplitNum;
+
+    private long currentMaxSplitSize;
+
+    public long getMaxInitialSplitSize() {
+        return maxInitialSplitSize;
+    }
+
+    public void setMaxInitialSplitSize(long maxInitialSplitSize) {
+        this.maxInitialSplitSize = maxInitialSplitSize;
+    }
+
+    public long getMaxSplitSize() {
+        return maxSplitSize;
+    }
+
+    public void setMaxSplitSize(long maxSplitSize) {
+        this.maxSplitSize = maxSplitSize;
+    }
+
+    public int maxInitialSplitNum() {
+        return maxInitialSplitNum;
+    }
+
+    public void setMaxInitialSplits(int maxInitialSplitNum) {
+        this.maxInitialSplitNum = maxInitialSplitNum;
+    }
+
+    public long getRemainingInitialSplitNum() {
+        return remainingInitialSplitNum.get();
+    }
+
+    public FileSplitter(long maxInitialSplitSize, long maxSplitSize, int 
maxInitialSplitNum) {
+        this.maxInitialSplitSize = maxInitialSplitSize;
+        this.maxSplitSize = maxSplitSize;
+        this.maxInitialSplitNum = maxInitialSplitNum;
+        currentMaxSplitSize = maxInitialSplitSize;
+        remainingInitialSplitNum = new AtomicInteger(maxInitialSplitNum);
+    }
+
+    public List<Split> splitFile(
+                LocationPath path,
+                long specifiedFileSplitSize,
+                BlockLocation[] blockLocations,
+                long length,
+                long modificationTime,
+                boolean splittable,
+                List<String> partitionValues,
+                SplitCreator splitCreator)
+                throws IOException {
+        // Pass splitCreator.create() to set target file split size to 
calculate split weight.
+        long targetFileSplitSize = specifiedFileSplitSize > 0 ? 
specifiedFileSplitSize : maxSplitSize;
         if (blockLocations == null) {
-            blockLocations = new BlockLocation[0];
+            blockLocations = new BlockLocation[1];
+            blockLocations[0] = new BlockLocation(null, null, 0L, length);
         }
         List<Split> result = Lists.newArrayList();
         TFileCompressType compressType = 
Util.inferFileCompressTypeByPath(path.getNormalizedLocation());
@@ -60,23 +114,83 @@ public class FileSplitter {
                 LOG.debug("Path {} is not splittable.", path);
             }
             String[] hosts = blockLocations.length == 0 ? null : 
blockLocations[0].getHosts();
-            result.add(splitCreator.create(path, 0, length, length, 
fileSplitSize,
+            result.add(splitCreator.create(path, 0, length, length,
+                    targetFileSplitSize,
                     modificationTime, hosts, partitionValues));
+            updateCurrentMaxSplitSize();
+            return result;
+        }
+
+        // if specified split size is not zero, split file by specified size
+        if (specifiedFileSplitSize > 0) {
+            long bytesRemaining;
+            for (bytesRemaining = length; (double) bytesRemaining / (double) 
specifiedFileSplitSize > 1.1D;
+                    bytesRemaining -= specifiedFileSplitSize) {
+                int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+                String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
+                result.add(splitCreator.create(path, length - bytesRemaining, 
specifiedFileSplitSize,
+                        length, specifiedFileSplitSize, modificationTime, 
hosts, partitionValues));
+            }
+            if (bytesRemaining != 0L) {
+                int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+                String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
+                result.add(splitCreator.create(path, length - bytesRemaining, 
bytesRemaining,
+                        length, specifiedFileSplitSize, modificationTime, 
hosts, partitionValues));
+            }
             return result;
         }
-        long bytesRemaining;
-        for (bytesRemaining = length; (double) bytesRemaining / (double) 
fileSplitSize > 1.1D;
-                bytesRemaining -= fileSplitSize) {
-            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
-            result.add(splitCreator.create(path, length - bytesRemaining, 
fileSplitSize,
-                    length, fileSplitSize, modificationTime, hosts, 
partitionValues));
+
+        // split file by block
+        long start = 0;
+        ImmutableList.Builder<InternalBlock> blockBuilder = 
ImmutableList.builder();
+        for (BlockLocation blockLocation : blockLocations) {
+            // clamp the block range
+            long blockStart = Math.max(start, blockLocation.getOffset());
+            long blockEnd = Math.min(start + length, blockLocation.getOffset() 
+ blockLocation.getLength());
+            if (blockStart > blockEnd) {
+                // block is outside split range
+                continue;
+            }
+            if (blockStart == blockEnd && !(blockStart == start && blockEnd == 
start + length)) {
+                // skip zero-width block, except in the special circumstance:
+                // slice is empty, and the block covers the empty slice 
interval.
+                continue;
+            }
+            blockBuilder.add(new InternalBlock(blockStart, blockEnd, 
blockLocation.getHosts()));
+        }
+        List<InternalBlock> blocks = blockBuilder.build();
+        if (blocks.isEmpty()) {
+            result.add(splitCreator.create(path, 0, length, length,
+                    targetFileSplitSize, modificationTime, null,
+                    partitionValues));
+            updateCurrentMaxSplitSize();
+            return result;
         }
-        if (bytesRemaining != 0L) {
-            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
-            result.add(splitCreator.create(path, length - bytesRemaining, 
bytesRemaining,
-                    length, fileSplitSize, modificationTime, hosts, 
partitionValues));
+
+        long splitStart = start;
+        int currentBlockIdx = 0;
+        while (splitStart < start + length) {
+            updateCurrentMaxSplitSize();
+            long splitBytes;
+            long remainingBlockBytes = blocks.get(currentBlockIdx).getEnd() - 
splitStart;
+            if (remainingBlockBytes <= currentMaxSplitSize) {
+                splitBytes = remainingBlockBytes;
+            } else if (currentMaxSplitSize * 2 >= remainingBlockBytes) {
+                // Second to last split in this block, generate two evenly 
sized splits
+                splitBytes = remainingBlockBytes / 2;
+            } else {
+                splitBytes = currentMaxSplitSize;
+            }
+            result.add(splitCreator.create(path, splitStart, splitBytes,
+                    length, targetFileSplitSize, modificationTime, 
blocks.get(currentBlockIdx).getHosts(),
+                    partitionValues));
+            splitStart += splitBytes;
+            if (splitStart == blocks.get(currentBlockIdx).getEnd()) {
+                currentBlockIdx++;
+                if (currentBlockIdx != blocks.size()) {
+                    Verify.verify(splitStart == 
blocks.get(currentBlockIdx).getStart());
+                }
+            }
         }
 
         if (LOG.isDebugEnabled()) {
@@ -85,7 +199,19 @@ public class FileSplitter {
         return result;
     }
 
-    private static int getBlockIndex(BlockLocation[] blkLocations, long 
offset) {
+    private void updateCurrentMaxSplitSize() {
+        currentMaxSplitSize = maxSplitSize;
+        int cur = remainingInitialSplitNum.get();
+        while (cur > 0) {
+            if (remainingInitialSplitNum.compareAndSet(cur, cur - 1)) {
+                currentMaxSplitSize = maxInitialSplitSize;
+                break;
+            }
+            cur = remainingInitialSplitNum.get();
+        }
+    }
+
+    private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
         if (blkLocations == null || blkLocations.length == 0) {
             return -1;
         }
@@ -100,5 +226,59 @@ public class FileSplitter {
         throw new IllegalArgumentException(String.format("Offset %d is outside 
of file (0..%d)", offset, fileLength));
     }
 
+    private static class InternalBlock {
+        private final long start;
+        private final long end;
+        private final String[] hosts;
+
+        public InternalBlock(long start, long end, String[] hosts) {
+            Preconditions.checkArgument(start <= end, "block end cannot be 
before block start");
+            this.start = start;
+            this.end = end;
+            this.hosts = hosts;
+        }
+
+        public long getStart() {
+            return start;
+        }
+
+        public long getEnd() {
+            return end;
+        }
+
+        public String[] getHosts() {
+            return hosts;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            InternalBlock that = (InternalBlock) o;
+            return start == that.start && end == that.end && 
Arrays.equals(hosts, that.hosts);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(start, end);
+            result = 31 * result + Arrays.hashCode(hosts);
+            return result;
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder("InternalBlock{");
+            sb.append("start=").append(start);
+            sb.append(", end=").append(end);
+            sb.append(", hosts=").append(Arrays.toString(hosts));
+            sb.append('}');
+            return sb.toString();
+        }
+    }
+
 
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index 0b8a1022d5a..391552a5106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -40,7 +40,7 @@ public interface SplitGenerator {
     /**
      * Whether the producer(e.g. ScanNode) support batch mode.
      */
-    default boolean isBatchMode() throws UserException {
+    default boolean isBatchMode() {
         return false;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 744423f622c..5bcf2f5546a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -186,7 +186,7 @@ public class HiveScanNode extends FileQueryScanNode {
                     .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
             String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
             List<Split> allFiles = Lists.newArrayList();
-            getFileSplitByPartitions(cache, prunedPartitions, allFiles, 
bindBrokerName, numBackends);
+            getFileSplitByPartitions(cache, prunedPartitions, allFiles, 
bindBrokerName, numBackends, false);
             if (ConnectContext.get().getExecutor() != null) {
                 
ConnectContext.get().getExecutor().getSummaryProfile().setGetPartitionFilesFinishTime();
             }
@@ -226,7 +226,8 @@ public class HiveScanNode extends FileQueryScanNode {
                         try {
                             List<Split> allFiles = Lists.newArrayList();
                             getFileSplitByPartitions(
-                                    cache, 
Collections.singletonList(partition), allFiles, bindBrokerName, numBackends);
+                                    cache, 
Collections.singletonList(partition), allFiles, bindBrokerName,
+                                    numBackends, true);
                             if (allFiles.size() > numSplitsPerPartition.get()) 
{
                                 numSplitsPerPartition.set(allFiles.size());
                             }
@@ -277,7 +278,8 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
-            List<Split> allFiles, String bindBrokerName, int numBackends) 
throws IOException, UserException {
+            List<Split> allFiles, String bindBrokerName, int numBackends,
+            boolean isBatchMode) throws IOException, UserException {
         List<FileCacheValue> fileCaches;
         if (hiveTransaction != null) {
             try {
@@ -293,9 +295,11 @@ public class HiveScanNode extends FileQueryScanNode {
             fileCaches = cache.getFilesByPartitions(partitions, withCache, 
partitions.size() > 1,
                     directoryLister, hmsTable);
         }
+
+        long targetFileSplitSize = determineTargetFileSplitSize(fileCaches, 
isBatchMode);
         if (tableSample != null) {
             List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses = 
selectFiles(fileCaches);
-            splitAllFiles(allFiles, hiveFileStatuses);
+            splitAllFiles(allFiles, hiveFileStatuses, targetFileSplitSize);
             return;
         }
 
@@ -319,27 +323,67 @@ public class HiveScanNode extends FileQueryScanNode {
             int parallelNum = sessionVariable.getParallelExecInstanceNum();
             needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, 
numBackends, totalFileNum);
         }
+
         for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
-            if (fileCacheValue.getFiles() != null) {
-                boolean isSplittable = fileCacheValue.isSplittable();
-                for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
-                    allFiles.addAll(FileSplitter.splitFile(status.getPath(),
-                            // set block size to Long.MAX_VALUE to avoid 
splitting the file.
-                            getRealFileSplitSize(needSplit ? 
status.getBlockSize() : Long.MAX_VALUE),
-                            status.getBlockLocations(), status.getLength(), 
status.getModificationTime(),
-                            isSplittable, fileCacheValue.getPartitionValues(),
-                            new 
HiveSplitCreator(fileCacheValue.getAcidInfo())));
+            if (fileCacheValue.getFiles() == null) {
+                continue;
+            }
+            boolean isSplittable = fileCacheValue.isSplittable();
+
+            for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
+                allFiles.addAll(fileSplitter.splitFile(
+                        status.getPath(),
+                        targetFileSplitSize,
+                        status.getBlockLocations(),
+                        status.getLength(),
+                        status.getModificationTime(),
+                        isSplittable && needSplit,
+                        fileCacheValue.getPartitionValues(),
+                        new HiveSplitCreator(fileCacheValue.getAcidInfo())));
+            }
+        }
+    }
+
+    private long determineTargetFileSplitSize(List<FileCacheValue> fileCaches,
+            boolean isBatchMode) {
+        if (sessionVariable.getFileSplitSize() > 0) {
+            return sessionVariable.getFileSplitSize();
+        }
+        /** Hive batch split mode will return 0. and <code>FileSplitter</code>
+         *  will determine file split size.
+         */
+        if (isBatchMode) {
+            return 0;
+        }
+        long result = sessionVariable.getMaxInitialSplitSize();
+        long totalFileSize = 0;
+        for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
+            if (fileCacheValue.getFiles() == null) {
+                continue;
+            }
+            for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
+                totalFileSize += status.getLength();
+                if (totalFileSize >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
+                    result = sessionVariable.getMaxSplitSize();
+                    break;
                 }
             }
         }
+        return result;
     }
 
     private void splitAllFiles(List<Split> allFiles,
-                               List<HiveMetaStoreCache.HiveFileStatus> 
hiveFileStatuses) throws IOException {
+                               List<HiveMetaStoreCache.HiveFileStatus> 
hiveFileStatuses,
+            long realFileSplitSize) throws IOException {
         for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
-            allFiles.addAll(FileSplitter.splitFile(status.getPath(), 
getRealFileSplitSize(status.getBlockSize()),
-                    status.getBlockLocations(), status.getLength(), 
status.getModificationTime(),
-                    status.isSplittable(), status.getPartitionValues(),
+            allFiles.addAll(fileSplitter.splitFile(
+                    status.getPath(),
+                    realFileSplitSize,
+                    status.getBlockLocations(),
+                    status.getLength(),
+                    status.getModificationTime(),
+                    status.isSplittable(),
+                    status.getPartitionValues(),
                     new HiveSplitCreator(status.getAcidInfo())));
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f7850040fca..030e30a13ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -86,6 +86,7 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.util.ScanTaskUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -120,7 +121,7 @@ public class IcebergScanNode extends FileQueryScanNode {
     private boolean tableLevelPushDownCount = false;
     private long countFromSnapshot;
     private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
-    private long targetSplitSize;
+    private long targetSplitSize = 0;
     // This is used to avoid repeatedly calculating partition info map for the 
same partition data.
     private Map<PartitionData, Map<String, String>> partitionMapInfos;
     private boolean isPartitionedTable;
@@ -147,6 +148,8 @@ public class IcebergScanNode extends FileQueryScanNode {
     private String cachedNormalizedPathPrefix;
     private String cachedFsIdentifier;
 
+    private Boolean isBatchMode = null;
+
     // for test
     @VisibleForTesting
     public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
@@ -187,7 +190,6 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     protected void doInitialize() throws UserException {
         icebergTable = source.getIcebergTable();
-        targetSplitSize = getRealFileSplitSize(0);
         partitionMapInfos = new HashMap<>();
         isPartitionedTable = icebergTable.spec().isPartitioned();
         formatVersion = ((BaseTable) 
icebergTable).operations().current().formatVersion();
@@ -392,19 +394,58 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
         if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) {
-            long targetSplitSize = getRealFileSplitSize(0);
-            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+            return splitFiles(scan);
         }
         try {
             return planFileScanTaskWithManifestCache(scan);
         } catch (Exception e) {
             manifestCacheFailures++;
             LOG.warn("Plan with manifest cache failed, fallback to original 
scan: " + e.getMessage(), e);
-            long targetSplitSize = getRealFileSplitSize(0);
-            return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+            return splitFiles(scan);
         }
     }
 
+    private CloseableIterable<FileScanTask> splitFiles(TableScan scan) {
+        if (sessionVariable.getFileSplitSize() > 0) {
+            return TableScanUtil.splitFiles(scan.planFiles(),
+                    sessionVariable.getFileSplitSize());
+        }
+        if (isBatchMode()) {
+            // Currently iceberg batch split mode will use max split size.
+            // TODO: dynamic split size in batch split mode need to customize 
iceberg splitter.
+            return TableScanUtil.splitFiles(scan.planFiles(), 
sessionVariable.getMaxSplitSize());
+        }
+
+        // Non Batch Mode
+        // Materialize planFiles() into a list to avoid iterating the 
CloseableIterable twice.
+        // RISK: It will cost memory if the table is large.
+        List<FileScanTask> fileScanTaskList = new ArrayList<>();
+        try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) 
{
+            for (FileScanTask task : scanTasksIter) {
+                fileScanTaskList.add(task);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to materialize file scan 
tasks", e);
+        }
+
+        targetSplitSize = determineTargetFileSplitSize(fileScanTaskList);
+        return 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), 
targetSplitSize);
+    }
+
+    private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) {
+        long result = sessionVariable.getMaxInitialSplitSize();
+        long accumulatedTotalFileSize = 0;
+        for (FileScanTask task : tasks) {
+            accumulatedTotalFileSize += 
ScanTaskUtil.contentSizeInBytes(task.file());
+            if (accumulatedTotalFileSize
+                    >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
+                result = sessionVariable.getMaxSplitSize();
+                break;
+            }
+        }
+        return result;
+    }
+
     private CloseableIterable<FileScanTask> 
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
         // Get the snapshot from the scan; return empty if no snapshot exists
         Snapshot snapshot = scan.snapshot();
@@ -519,7 +560,7 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
 
         // Split tasks into smaller chunks based on target split size for 
parallel processing
-        long targetSplitSize = getRealFileSplitSize(0);
+        targetSplitSize = determineTargetFileSplitSize(tasks);
         return 
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks), 
targetSplitSize);
     }
 
@@ -677,21 +718,36 @@ public class IcebergScanNode extends FileQueryScanNode {
     }
 
     @Override
-    public boolean isBatchMode() throws UserException {
+    public boolean isBatchMode() {
+        Boolean cached = isBatchMode;
+        if (cached != null) {
+            return cached;
+        }
         TPushAggOp aggOp = getPushDownAggNoGroupingOp();
         if (aggOp.equals(TPushAggOp.COUNT)) {
-            countFromSnapshot = getCountFromSnapshot();
+            try {
+                countFromSnapshot = getCountFromSnapshot();
+            } catch (UserException e) {
+                throw new RuntimeException(e);
+            }
             if (countFromSnapshot >= 0) {
                 tableLevelPushDownCount = true;
+                isBatchMode = false;
                 return false;
             }
         }
 
-        if (createTableScan().snapshot() == null) {
-            return false;
+        try {
+            if (createTableScan().snapshot() == null) {
+                isBatchMode = false;
+                return false;
+            }
+        } catch (UserException e) {
+            throw new RuntimeException(e);
         }
 
         if (!sessionVariable.getEnableExternalTableBatchMode()) {
+            isBatchMode = false;
             return false;
         }
 
@@ -707,10 +763,12 @@ public class IcebergScanNode extends FileQueryScanNode {
                         ManifestFile next = matchingManifest.next();
                         cnt += next.addedFilesCount() + 
next.existingFilesCount();
                         if (cnt >= sessionVariable.getNumFilesInBatchMode()) {
+                            isBatchMode = true;
                             return true;
                         }
                     }
                 }
+                isBatchMode = false;
                 return false;
             });
         } catch (Exception e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 92b0862d06e..4e28c924f33 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -28,7 +28,6 @@ import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.datasource.ExternalUtil;
 import org.apache.doris.datasource.FileQueryScanNode;
-import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.datasource.credentials.CredentialUtils;
 import org.apache.doris.datasource.credentials.VendedCredentialsFactory;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
@@ -298,7 +297,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         // And for counting the number of selected partitions for this paimon 
table.
         Map<BinaryRow, Map<String, String>> partitionInfoMaps = new 
HashMap<>();
         // if applyCountPushdown is true, we can't split the DataSplit
-        long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? 
Long.MAX_VALUE : 0);
+        boolean hasDeterminedTargetFileSplitSize = false;
+        long targetFileSplitSize = 0;
         for (DataSplit dataSplit : dataSplits) {
             SplitStat splitStat = new SplitStat();
             splitStat.setRowCount(dataSplit.rowCount());
@@ -330,6 +330,10 @@ public class PaimonScanNode extends FileQueryScanNode {
                 if (ignoreSplitType == 
SessionVariable.IgnoreSplitType.IGNORE_NATIVE) {
                     continue;
                 }
+                if (!hasDeterminedTargetFileSplitSize) {
+                    targetFileSplitSize = 
determineTargetFileSplitSize(dataSplits, isBatchMode());
+                    hasDeterminedTargetFileSplitSize = true;
+                }
                 splitStat.setType(SplitReadType.NATIVE);
                 splitStat.setRawFileConvertable(true);
                 List<RawFile> rawFiles = optRawFiles.get();
@@ -337,13 +341,13 @@ public class PaimonScanNode extends FileQueryScanNode {
                     RawFile file = rawFiles.get(i);
                     LocationPath locationPath = LocationPath.of(file.path(), 
storagePropertiesMap);
                     try {
-                        List<Split> dorisSplits = FileSplitter.splitFile(
+                        List<Split> dorisSplits = fileSplitter.splitFile(
                                 locationPath,
-                                realFileSplitSize,
+                                targetFileSplitSize,
                                 null,
                                 file.length(),
                                 -1,
-                                true,
+                                !applyCountPushdown,
                                 null,
                                 PaimonSplit.PaimonSplitCreator.DEFAULT);
                         for (Split dorisSplit : dorisSplits) {
@@ -388,12 +392,43 @@ public class PaimonScanNode extends FileQueryScanNode {
 
         // We need to set the target size for all splits so that we can 
calculate the
         // proportion of each split later.
-        splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));
+        splits.forEach(s -> 
s.setTargetSplitSize(sessionVariable.getFileSplitSize() > 0
+                ? sessionVariable.getFileSplitSize() : 
sessionVariable.getMaxSplitSize()));
 
         this.selectedPartitionNum = partitionInfoMaps.size();
         return splits;
     }
 
+    private long determineTargetFileSplitSize(List<DataSplit> dataSplits,
+            boolean isBatchMode) {
+        if (sessionVariable.getFileSplitSize() > 0) {
+            return sessionVariable.getFileSplitSize();
+        }
+        /** Paimon batch split mode will return 0. and 
<code>FileSplitter</code>
+         *  will determine file split size.
+         */
+        if (isBatchMode) {
+            return 0;
+        }
+        long result = sessionVariable.getMaxInitialSplitSize();
+        long totalFileSize = 0;
+        for (DataSplit dataSplit : dataSplits) {
+            Optional<List<RawFile>> rawFiles = dataSplit.convertToRawFiles();
+            if (!supportNativeReader(rawFiles)) {
+                continue;
+            }
+            for (RawFile rawFile : rawFiles.get()) {
+                totalFileSize += rawFile.fileSize();
+                if (totalFileSize
+                        >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
+                    result = sessionVariable.getMaxSplitSize();
+                    break;
+                }
+            }
+        }
+        return result;
+    }
+
     @VisibleForTesting
     public Map<String, String> getIncrReadParams() throws UserException {
         Map<String, String> paimonScanParams = new HashMap<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index e7567559762..c3b0e3e8b6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -146,12 +146,18 @@ public class TVFScanNode extends FileQueryScanNode {
             needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, 
numBackends, totalFileNum);
         }
 
+        long targetFileSplitSize = determineTargetFileSplitSize(fileStatuses);
+
         for (TBrokerFileStatus fileStatus : fileStatuses) {
             try {
-                
splits.addAll(FileSplitter.splitFile(LocationPath.of(fileStatus.getPath()),
-                        getRealFileSplitSize(needSplit ? 
fileStatus.getBlockSize() : Long.MAX_VALUE),
-                        null, fileStatus.getSize(),
-                        fileStatus.getModificationTime(), 
fileStatus.isSplitable, null,
+                splits.addAll(fileSplitter.splitFile(
+                        LocationPath.of(fileStatus.getPath()),
+                        targetFileSplitSize,
+                        null,
+                        fileStatus.getSize(),
+                        fileStatus.getModificationTime(),
+                        fileStatus.isSplitable && needSplit,
+                        null,
                         FileSplitCreator.DEFAULT));
             } catch (IOException e) {
                 LOG.warn("get file split failed for TVF: {}", 
fileStatus.getPath(), e);
@@ -161,6 +167,23 @@ public class TVFScanNode extends FileQueryScanNode {
         return splits;
     }
 
+    private long determineTargetFileSplitSize(List<TBrokerFileStatus> 
fileStatuses) {
+        if (sessionVariable.getFileSplitSize() > 0) {
+            return sessionVariable.getFileSplitSize();
+        }
+        long result = sessionVariable.getMaxInitialSplitSize();
+        long totalFileSize = 0;
+        for (TBrokerFileStatus fileStatus : fileStatuses) {
+            totalFileSize += fileStatus.getSize();
+            if (totalFileSize
+                    >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
+                result = sessionVariable.getMaxSplitSize();
+                break;
+            }
+        }
+        return result;
+    }
+
     @Override
     protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
         if (split instanceof FileSplit) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 11df0e53eb1..79768d17b4a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -513,6 +513,12 @@ public class SessionVariable implements Serializable, 
Writable {
     // Split size for ExternalFileScanNode. Default value 0 means use the 
block size of HDFS/S3.
     public static final String FILE_SPLIT_SIZE = "file_split_size";
 
+    public static final String MAX_INITIAL_FILE_SPLIT_SIZE = 
"max_initial_file_split_size";
+
+    public static final String MAX_FILE_SPLIT_SIZE = "max_file_split_size";
+
+    public static final String MAX_INITIAL_FILE_SPLIT_NUM = 
"max_initial_file_split_num";
+
     // Target file size in bytes for Iceberg write operations
     public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES = 
"iceberg_write_target_file_size_bytes";
 
@@ -2182,6 +2188,36 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
     public long fileSplitSize = 0;
 
+    @VariableMgr.VarAttr(
+            name = MAX_INITIAL_FILE_SPLIT_SIZE,
+            description = {"对于每个 table scan,最大文件分片初始大小。"
+                    + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了 
MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。",
+                    "For each table scan, The maximum initial file split size. 
"
+                            + "Initialize using MAX_INITIAL_FILE_SPLIT_SIZE,"
+                            + " and once MAX_INITIAL_FILE_SPLIT_NUM is 
exceeded, use MAX_FILE_SPLIT_SIZE instead."},
+            needForward = true)
+    public long maxInitialSplitSize = 32L * 1024L * 1024L;
+
+    @VariableMgr.VarAttr(
+            name = MAX_FILE_SPLIT_SIZE,
+            description = {"对于每个 table scan,最大文件分片大小。"
+                    + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了 
MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。",
+                    "For each table scan, the maximum initial file split size. 
"
+                            + "Initialize using MAX_INITIAL_FILE_SPLIT_SIZE,"
+                            + " and once MAX_INITIAL_FILE_SPLIT_NUM is 
exceeded, use MAX_FILE_SPLIT_SIZE instead."},
+            needForward = true)
+    public long maxSplitSize = 64L * 1024L * 1024L;
+
+    @VariableMgr.VarAttr(
+            name = MAX_INITIAL_FILE_SPLIT_NUM,
+            description = {"对于每个 table scan,最大文件分片初始数目。"
+                    + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了 
MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。",
+                    "For each table scan, the maximum initial file split 
number. "
+                            + "Initialize using MAX_INITIAL_FILE_SPLIT_SIZE,"
+                            + " and once MAX_INITIAL_FILE_SPLIT_NUM is 
exceeded, use MAX_FILE_SPLIT_SIZE instead."},
+            needForward = true)
+    public int maxInitialSplitNum = 200;
+
     // Target file size for Iceberg write operations
     // Default 0 means use config::iceberg_sink_max_file_size
     @VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES, 
needForward = true)
@@ -4236,6 +4272,30 @@ public class SessionVariable implements Serializable, 
Writable {
         this.fileSplitSize = fileSplitSize;
     }
 
+    public long getMaxInitialSplitSize() {
+        return maxInitialSplitSize;
+    }
+
+    public void setMaxInitialSplitSize(long maxInitialSplitSize) {
+        this.maxInitialSplitSize = maxInitialSplitSize;
+    }
+
+    public long getMaxSplitSize() {
+        return maxSplitSize;
+    }
+
+    public void setMaxSplitSize(long maxSplitSize) {
+        this.maxSplitSize = maxSplitSize;
+    }
+
+    public int getMaxInitialSplitNum() {
+        return maxInitialSplitNum;
+    }
+
+    public void setMaxInitialSplitNum(int maxInitialSplitNum) {
+        this.maxInitialSplitNum = maxInitialSplitNum;
+    }
+
     public long getIcebergWriteTargetFileSizeBytes() {
         return icebergWriteTargetFileSizeBytes;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java
new file mode 100644
index 00000000000..a455923da4d
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.spi.Split;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FileSplitterTest {
+
+    private static final long MB = 1024L * 1024L;
+
+    private static final int DEFAULT_INITIAL_SPLITS = 200;
+
+    @Test
+    public void testNonSplittableCompressedFileProducesSingleSplit() throws 
Exception {
+        LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz");
+        BlockLocation[] locations = new BlockLocation[]{new 
BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)};
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 
DEFAULT_INITIAL_SPLITS);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                10 * MB,
+                0L,
+                true,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+        Assert.assertEquals(1, splits.size());
+        Split s = splits.get(0);
+        Assert.assertEquals(10 * MB, ((org.apache.doris.datasource.FileSplit) 
s).getLength());
+        // host should be preserved
+        Assert.assertArrayEquals(new String[]{"h1"}, 
((org.apache.doris.datasource.FileSplit) s).getHosts());
+        Assert.assertEquals(DEFAULT_INITIAL_SPLITS - 1, 
fileSplitter.getRemainingInitialSplitNum());
+    }
+
+    @Test
+    public void testEmptyBlockLocationsProducesSingleSplitAndNullHosts() 
throws Exception {
+        LocationPath loc = LocationPath.of("hdfs://example.com/path/file");
+        BlockLocation[] locations = new BlockLocation[0];
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 
DEFAULT_INITIAL_SPLITS);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                5 * MB,
+                0L,
+                true,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+        Assert.assertEquals(1, splits.size());
+        org.apache.doris.datasource.FileSplit s = 
(org.apache.doris.datasource.FileSplit) splits.get(0);
+        Assert.assertEquals(5 * MB, s.getLength());
+        // hosts should be empty array when passing null
+        Assert.assertNotNull(s.getHosts());
+        Assert.assertEquals(0, s.getHosts().length);
+    }
+
+    @Test
+    public void 
testSplittableSingleBigBlockProducesExpectedSplitsWithInitialSmallChunks() 
throws Exception {
+        LocationPath loc = LocationPath.of("hdfs://example.com/path/bigfile");
+        long length = 200 * MB;
+        BlockLocation[] locations = new BlockLocation[]{new 
BlockLocation(null, new String[]{"h1"}, 0L, length)};
+        // set maxInitialSplits to 2 to force the first two splits to be small.
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                length,
+                0L,
+                true,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+
+        // expect splits sizes: 32MB, 32MB, 64MB, 36MB, 36MB -> sum is 200MB
+        long[] expected = new long[]{32 * MB, 32 * MB, 64 * MB, 36 * MB, 36 * 
MB};
+        Assert.assertEquals(expected.length, splits.size());
+        long sum = 0L;
+        for (int i = 0; i < expected.length; i++) {
+            org.apache.doris.datasource.FileSplit s = 
(org.apache.doris.datasource.FileSplit) splits.get(i);
+            Assert.assertEquals(expected[i], s.getLength());
+            sum += s.getLength();
+            // ensure host preserved
+            Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts());
+        }
+        Assert.assertEquals(length, sum);
+        // ensure the initial small-split counter is consumed for the two 
initial small splits
+        Assert.assertEquals(0, fileSplitter.getRemainingInitialSplitNum());
+    }
+
+    @Test
+    public void testMultiBlockSplitsAndHostPreservation() throws Exception {
+        LocationPath loc = 
LocationPath.of("hdfs://example.com/path/twoblocks");
+        long len = 96 * MB;
+        BlockLocation[] locations = new BlockLocation[]{
+                new BlockLocation(null, new String[]{"h1"}, 0L, 48 * MB),
+                new BlockLocation(null, new String[]{"h2"}, 48 * MB, 48 * MB)
+        };
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 0);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                len,
+                0L,
+                true,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+        Assert.assertEquals(2, splits.size());
+        FileSplit s0 = (FileSplit) splits.get(0);
+        FileSplit s1 = (FileSplit) splits.get(1);
+        Assert.assertEquals(48 * MB, s0.getLength());
+        Assert.assertEquals(48 * MB, s1.getLength());
+        Assert.assertArrayEquals(new String[]{"h1"}, s0.getHosts());
+        Assert.assertArrayEquals(new String[]{"h2"}, s1.getHosts());
+    }
+
+    @Test
+    public void testZeroLengthBlockIsSkipped() throws Exception {
+        LocationPath loc = 
LocationPath.of("hdfs://example.com/path/zeroblock");
+        long length = 10 * MB;
+        BlockLocation[] locations = new BlockLocation[]{
+                new BlockLocation(null, new String[]{"h1"}, 0L, 0L),
+                new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)
+        };
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 
DEFAULT_INITIAL_SPLITS);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                length,
+                0L,
+                true,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+        Assert.assertEquals(1, splits.size());
+        FileSplit s = (FileSplit) splits.get(0);
+        Assert.assertEquals(10 * MB, s.getLength());
+        Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts());
+    }
+
+    @Test
+    public void testNonSplittableFlagDecrementsCounter() throws Exception {
+        LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz");
+        BlockLocation[] locations = new BlockLocation[]{new 
BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)};
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                10 * MB,
+                0L,
+                false,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+        Assert.assertEquals(1, splits.size());
+    }
+
+    @Test
+    public void testNullRemainingInitialSplitIsAllowed() throws Exception {
+        LocationPath loc = LocationPath.of("hdfs://example.com/path/somefile");
+        BlockLocation[] locations = new BlockLocation[]{new 
BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)};
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 
DEFAULT_INITIAL_SPLITS);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                10 * MB,
+                0L,
+                true,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+        Assert.assertEquals(1, splits.size());
+    }
+
+    @Test
+    public void testSmallFileNoSplit() throws Exception {
+        LocationPath loc = LocationPath.of("hdfs://example.com/path/small");
+        BlockLocation[] locations = new BlockLocation[]{new 
BlockLocation(null, new String[]{"h1"}, 0L, 2 * MB)};
+        FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 
DEFAULT_INITIAL_SPLITS);
+        List<Split> splits = fileSplitter.splitFile(
+                loc,
+                0L,
+                locations,
+                2 * MB,
+                0L,
+                true,
+                Collections.emptyList(),
+                FileSplit.FileSplitCreator.DEFAULT);
+        Assert.assertEquals(1, splits.size());
+        FileSplit s = (FileSplit) splits.get(0);
+        Assert.assertEquals(2 * MB, s.getLength());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 93afa390530..692a0db12ca 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -21,6 +21,8 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.common.ExceptionChecker;
 import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
@@ -92,11 +94,26 @@ public class PaimonScanNodeTest {
             }
         }).when(spyPaimonScanNode).getPaimonSplitFromAPI();
 
+        long maxInitialSplitSize = 32L * 1024L * 1024L;
+        long maxSplitSize = 64L * 1024L * 1024L;
+        // Ensure fileSplitter is initialized on the spy as doInitialize() is 
not called in this unit test
+        FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize, 
maxSplitSize,
+                0);
+        try {
+            java.lang.reflect.Field field = 
FileQueryScanNode.class.getDeclaredField("fileSplitter");
+            field.setAccessible(true);
+            field.set(spyPaimonScanNode, fileSplitter);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException("Failed to inject FileSplitter into 
PaimonScanNode test", e);
+        }
+
         // Note: The original PaimonSource is sufficient for this test
         // No need to mock catalog properties since doInitialize() is not 
called in this test
         // Mock SessionVariable behavior
         Mockito.when(sv.isForceJniScanner()).thenReturn(false);
         Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE");
+        
Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize);
+        Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize);
 
         // native
         mockNativeReader(spyPaimonScanNode);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
index 30582224f76..f6e8efd5294 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java
@@ -769,9 +769,9 @@ public class FederationBackendPolicyTest {
         Map<Backend, List<Split>> backendListMap2 = 
mergeAssignment(assignment2);
         backendListMap2.forEach((k, v) -> {
             if (k.getId() == 1) {
-                Assert.assertEquals(900000L, 
v.stream().mapToLong(Split::getLength).sum());
+                Assert.assertEquals(1000000L, 
v.stream().mapToLong(Split::getLength).sum());
             } else if (k.getId() == 2) {
-                Assert.assertEquals(500000L, 
v.stream().mapToLong(Split::getLength).sum());
+                Assert.assertEquals(400000L, 
v.stream().mapToLong(Split::getLength).sum());
             } else if (k.getId() == 3) {
                 Assert.assertEquals(1000000L, 
v.stream().mapToLong(Split::getLength).sum());
             }
diff --git 
a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy 
b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy
index 4117501eff2..44dd9104411 100644
--- 
a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy
+++ 
b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy
@@ -48,7 +48,7 @@ suite("test_hive_compress_type", 
"p0,external,hive,external_docker,external_dock
         sql """set file_split_size=8388608"""
         explain {
             sql("select count(*) from test_compress_partitioned")
-            contains "inputSplitNum=82, totalFileSize=734675596, scanRanges=82"
+            contains "inputSplitNum=16, totalFileSize=734675596, scanRanges=16"
             contains "partition=8/8"
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to