This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d6736b90e88 branch-3.1: [feat](iceberg) implement iceberg partition 
batch mode (#52095)
d6736b90e88 is described below

commit d6736b90e88b4236876fc9568a54ba38e2972756
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sat Jun 21 11:44:31 2025 +0800

    branch-3.1: [feat](iceberg) implement iceberg partition batch mode (#52095)
    
    bp  #46398 #49025 #49489 #50434 #51185 #51694
    
    ---------
    
    Co-authored-by: wuwenchi <[email protected]>
    Co-authored-by: daidai <[email protected]>
---
 be/src/common/config.cpp                           |   2 +-
 .../org/apache/doris/common/ThreadPoolManager.java |  69 ++++
 .../apache/doris/datasource/ExternalCatalog.java   |   6 +
 .../doris/datasource/ExternalMetaCacheMgr.java     |   1 +
 .../apache/doris/datasource/FileQueryScanNode.java |   8 +-
 .../apache/doris/datasource/SplitAssignment.java   |  86 +++--
 .../apache/doris/datasource/SplitGenerator.java    |   2 +-
 .../doris/datasource/hive/source/HiveScanNode.java |   4 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |  12 +-
 .../datasource/iceberg/IcebergExternalCatalog.java |   8 +
 .../iceberg/IcebergHMSExternalCatalog.java         |   7 -
 .../datasource/iceberg/IcebergMetadataOps.java     |  28 +-
 .../doris/datasource/iceberg/IcebergUtils.java     |  35 ++
 .../datasource/iceberg/source/IcebergScanNode.java | 270 +++++++++----
 .../datasource/iceberg/source/IcebergSplit.java    |   3 +-
 .../maxcompute/source/MaxComputeScanNode.java      |   8 +-
 .../java/org/apache/doris/planner/ScanNode.java    |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  26 +-
 .../doris/datasource/SplitAssignmentTest.java      | 427 +++++++++++++++++++++
 .../doris/datasource/iceberg/IcebergUtilsTest.java | 145 ++++++-
 .../iceberg/source/IcebergScanNodeTest.java        | 181 +++++++++
 .../iceberg/test_iceberg_filter.groovy             |   2 +
 .../iceberg/test_iceberg_optimize_count.groovy     |  56 +++
 23 files changed, 1264 insertions(+), 124 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 4ce28d89f0e..b57865daef0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -306,7 +306,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num, 
[](const int config) -> b
     return true;
 });
 DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
-DEFINE_Int32(remote_split_source_batch_size, "10240");
+DEFINE_Int32(remote_split_source_batch_size, "1000");
 DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
 // number of olap scanner thread pool queue size
 DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index dcb1f704271..0822a322dec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common;
 
 
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.metric.MetricLabel;
@@ -33,6 +34,7 @@ import java.util.Comparator;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -69,6 +71,7 @@ import java.util.function.Supplier;
 
 public class ThreadPoolManager {
 
+    private static final Logger LOG = 
LogManager.getLogger(ThreadPoolManager.class);
     private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap = 
Maps.newConcurrentMap();
 
     private static String[] poolMetricTypes = {"pool_size", 
"active_thread_num", "task_in_queue"};
@@ -140,6 +143,17 @@ public class ThreadPoolManager {
                 poolName, needRegisterMetric);
     }
 
+    public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth(
+            int numThread,
+            int queueSize,
+            String poolName,
+            boolean needRegisterMetric,
+            PreExecutionAuthenticator preAuth) {
+        return newDaemonThreadPoolWithPreAuth(numThread, numThread, 
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName, 
60),
+            poolName, needRegisterMetric, preAuth);
+    }
+
     public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread, 
int queueSize,
                                                               String poolName, 
int timeoutSeconds,
                                                               boolean 
needRegisterMetric) {
@@ -229,6 +243,40 @@ public class ThreadPoolManager {
         return new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(poolName + "-%d").build();
     }
 
+
+    public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth(
+            int corePoolSize,
+            int maximumPoolSize,
+            long keepAliveTime,
+            TimeUnit unit,
+            BlockingQueue<Runnable> workQueue,
+            RejectedExecutionHandler handler,
+            String poolName,
+            boolean needRegisterMetric,
+            PreExecutionAuthenticator preAuth) {
+        ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName, 
preAuth);
+        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, 
maximumPoolSize,
+                keepAliveTime, unit, workQueue, threadFactory, handler);
+        if (needRegisterMetric) {
+            nameToThreadPoolMap.put(poolName, threadPool);
+        }
+        return threadPool;
+    }
+
+    private static ThreadFactory namedThreadFactoryWithPreAuth(String 
poolName, PreExecutionAuthenticator preAuth) {
+        return new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat(poolName + "-%d")
+            .setThreadFactory(runnable -> new Thread(() -> {
+                try {
+                    preAuth.execute(runnable);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }))
+            .build();
+    }
+
     private static class PriorityThreadPoolExecutor<T> extends 
ThreadPoolExecutor {
 
         private final Comparator<T> comparator;
@@ -384,4 +432,25 @@ public class ThreadPoolManager {
             }
         }
     }
+
+    public static void shutdownExecutorService(ExecutorService 
executorService) {
+        // Disable new tasks from being submitted
+        executorService.shutdown();
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                // Cancel currently executing tasks
+                executorService.shutdownNow();
+                // Wait a while for tasks to respond to being cancelled
+                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+                    LOG.warn("ExecutorService did not terminate");
+                }
+            }
+        } catch (InterruptedException e) {
+            // (Re-)Cancel if current thread also interrupted
+            executorService.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index d89d4582862..6aed86e9c25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.io.Text;
@@ -98,6 +99,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.stream.Collectors;
 
 /**
@@ -167,6 +169,7 @@ public abstract class ExternalCatalog
     protected Optional<Boolean> useMetaCache = Optional.empty();
     protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
     protected PreExecutionAuthenticator preExecutionAuthenticator;
+    protected ThreadPoolExecutor threadPoolWithPreAuth;
 
     private volatile Configuration cachedConf = null;
     private byte[] confLock = new byte[0];
@@ -764,6 +767,9 @@ public abstract class ExternalCatalog
         if (null != transactionManager) {
             transactionManager = null;
         }
+        if (threadPoolWithPreAuth != null) {
+            ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
+        }
         CatalogIf.super.onClose();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index c2f50f929f8..79dd86345ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -83,6 +83,7 @@ public class ExternalMetaCacheMgr {
     private ExecutorService rowCountRefreshExecutor;
     private ExecutorService commonRefreshExecutor;
     private ExecutorService fileListingExecutor;
+    // This executor is used to schedule the getting split tasks
     private ExecutorService scheduleExecutor;
 
     // catalog id -> HiveMetaStoreCache
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 107c3541577..dc85731d4d2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -335,17 +335,19 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
             if (splitAssignment.getSampleSplit() == null && 
!isFileStreamType()) {
                 return;
             }
-            selectedSplitNum = numApproximateSplits();
-
             FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
             TFileType locationType = fileSplit.getLocationType();
+            selectedSplitNum = numApproximateSplits();
+            if (selectedSplitNum < 0) {
+                throw new UserException("Approximate split number should not 
be negative");
+            }
             totalFileSize = fileSplit.getLength() * selectedSplitNum;
             long maxWaitTime = sessionVariable.getFetchSplitsMaxWaitTime();
             // Not accurate, only used to estimate concurrency.
             // Here, we must take the max of 1, because
             // in the case of multiple BEs, `numApproximateSplits() / 
backendPolicy.numBackends()` may be 0,
             // and finally numSplitsPerBE is 0, resulting in no data being 
queried.
-            int numSplitsPerBE = Math.max(numApproximateSplits() / 
backendPolicy.numBackends(), 1);
+            int numSplitsPerBE = Math.max(selectedSplitNum / 
backendPolicy.numBackends(), 1);
             for (Backend backend : backendPolicy.getBackends()) {
                 SplitSource splitSource = new SplitSource(backend, 
splitAssignment, maxWaitTime);
                 splitSources.add(splitSource);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index a26abc7fc5e..cc17818d6b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -23,7 +23,10 @@ import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.collect.Multimap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -33,6 +36,7 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -40,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * `SplitGenerator` provides the file splits, and `FederationBackendPolicy` 
assigns these splits to backends.
  */
 public class SplitAssignment {
+    private static final Logger LOG = 
LogManager.getLogger(SplitAssignment.class);
     private final Set<Long> sources = new HashSet<>();
     private final FederationBackendPolicy backendPolicy;
     private final SplitGenerator splitGenerator;
@@ -50,10 +55,11 @@ public class SplitAssignment {
     private final List<String> pathPartitionKeys;
     private final Object assignLock = new Object();
     private Split sampleSplit = null;
-    private final AtomicBoolean isStop = new AtomicBoolean(false);
+    private final AtomicBoolean isStopped = new AtomicBoolean(false);
     private final AtomicBoolean scheduleFinished = new AtomicBoolean(false);
 
     private UserException exception = null;
+    private final List<Closeable> closeableResources = new ArrayList<>();
 
     public SplitAssignment(
             FederationBackendPolicy backendPolicy,
@@ -71,12 +77,20 @@ public class SplitAssignment {
     public void init() throws UserException {
         splitGenerator.startSplit(backendPolicy.numBackends());
         synchronized (assignLock) {
-            while (sampleSplit == null && waitFirstSplit()) {
+            final int waitIntervalTimeMillis = 100;
+            final int initTimeoutMillis = 30000; // 30s
+            int waitTotalTime = 0;
+            while (sampleSplit == null && needMoreSplit()) {
                 try {
-                    assignLock.wait(100);
+                    assignLock.wait(waitIntervalTimeMillis);
                 } catch (InterruptedException e) {
                     throw new UserException(e.getMessage(), e);
                 }
+                waitTotalTime += waitIntervalTimeMillis;
+                if (waitTotalTime > initTimeoutMillis) {
+                    throw new UserException("Failed to get first split after 
waiting for "
+                            + (waitTotalTime / 1000) + " seconds.");
+                }
             }
         }
         if (exception != null) {
@@ -84,8 +98,8 @@ public class SplitAssignment {
         }
     }
 
-    private boolean waitFirstSplit() {
-        return !scheduleFinished.get() && !isStop.get() && exception == null;
+    public boolean needMoreSplit() {
+        return !scheduleFinished.get() && !isStopped.get() && exception == 
null;
     }
 
     private void appendBatch(Multimap<Backend, Split> batch) throws 
UserException {
@@ -95,8 +109,16 @@ public class SplitAssignment {
             for (Split split : splits) {
                 locations.add(splitToScanRange.getScanRange(backend, 
locationProperties, split, pathPartitionKeys));
             }
-            if (!assignment.computeIfAbsent(backend, be -> new 
LinkedBlockingQueue<>()).offer(locations)) {
-                throw new UserException("Failed to offer batch split");
+            while (needMoreSplit()) {
+                BlockingQueue<Collection<TScanRangeLocations>> queue =
+                        assignment.computeIfAbsent(backend, be -> new 
LinkedBlockingQueue<>(10000));
+                try {
+                    if (queue.offer(locations, 100, TimeUnit.MILLISECONDS)) {
+                        break;
+                    }
+                } catch (InterruptedException e) {
+                    addUserException(new UserException("Failed to offer batch 
split by interrupted", e));
+                }
             }
         }
     }
@@ -113,7 +135,7 @@ public class SplitAssignment {
         return sampleSplit;
     }
 
-    public void addToQueue(List<Split> splits) {
+    public void addToQueue(List<Split> splits) throws UserException {
         if (splits.isEmpty()) {
             return;
         }
@@ -123,19 +145,9 @@ public class SplitAssignment {
                 sampleSplit = splits.get(0);
                 assignLock.notify();
             }
-            try {
-                batch = backendPolicy.computeScanRangeAssignment(splits);
-            } catch (UserException e) {
-                exception = e;
-            }
-        }
-        if (batch != null) {
-            try {
-                appendBatch(batch);
-            } catch (UserException e) {
-                exception = e;
-            }
+            batch = backendPolicy.computeScanRangeAssignment(splits);
         }
+        appendBatch(batch);
     }
 
     private void notifyAssignment() {
@@ -150,28 +162,54 @@ public class SplitAssignment {
         }
         BlockingQueue<Collection<TScanRangeLocations>> splits = 
assignment.computeIfAbsent(backend,
                 be -> new LinkedBlockingQueue<>());
-        if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) {
+        if (scheduleFinished.get() && splits.isEmpty() || isStopped.get()) {
             return null;
         }
         return splits;
     }
 
     public void setException(UserException e) {
-        exception = e;
+        addUserException(e);
         notifyAssignment();
     }
 
+    private void addUserException(UserException e) {
+        if (exception != null) {
+            exception.addSuppressed(e);
+        } else {
+            exception = e;
+        }
+    }
+
     public void finishSchedule() {
         scheduleFinished.set(true);
         notifyAssignment();
     }
 
     public void stop() {
-        isStop.set(true);
+        if (isStop()) {
+            return;
+        }
+        isStopped.set(true);
+        closeableResources.forEach((closeable) -> {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                LOG.warn("close resource error:{}", e.getMessage(), e);
+                // ignore
+            }
+        });
         notifyAssignment();
+        if (exception != null) {
+            throw new RuntimeException(exception);
+        }
     }
 
     public boolean isStop() {
-        return isStop.get();
+        return isStopped.get();
+    }
+
+    public void addCloseable(Closeable resource) {
+        closeableResources.add(resource);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index 34ff3911445..391552a5106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -52,7 +52,7 @@ public interface SplitGenerator {
         return -1;
     }
 
-    default void startSplit(int numBackends) {
+    default void startSplit(int numBackends) throws UserException {
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 9aff964631b..91002842b44 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -223,7 +223,9 @@ public class HiveScanNode extends FileQueryScanNode {
                             if (allFiles.size() > numSplitsPerPartition.get()) 
{
                                 numSplitsPerPartition.set(allFiles.size());
                             }
-                            splitAssignment.addToQueue(allFiles);
+                            if (splitAssignment.needMoreSplit()) {
+                                splitAssignment.addToQueue(allFiles);
+                            }
                         } catch (Exception e) {
                             batchException.set(new 
UserException(e.getMessage(), e));
                         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 8e33dfb2469..f14e02f3e58 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -74,6 +74,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -401,6 +402,7 @@ public class HudiScanNode extends HiveScanNode {
             return;
         }
         AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+        ExecutorService scheduleExecutor = 
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
         CompletableFuture.runAsync(() -> {
             for (HivePartition partition : prunedPartitions) {
                 if (batchException.get() != null || splitAssignment.isStop()) {
@@ -419,8 +421,10 @@ public class HudiScanNode extends HiveScanNode {
                         if (allFiles.size() > numSplitsPerPartition.get()) {
                             numSplitsPerPartition.set(allFiles.size());
                         }
-                        splitAssignment.addToQueue(allFiles);
-                    } catch (IOException e) {
+                        if (splitAssignment.needMoreSplit()) {
+                            splitAssignment.addToQueue(allFiles);
+                        }
+                    } catch (Exception e) {
                         batchException.set(new UserException(e.getMessage(), 
e));
                     } finally {
                         splittersOnFlight.release();
@@ -431,12 +435,12 @@ public class HudiScanNode extends HiveScanNode {
                             splitAssignment.finishSchedule();
                         }
                     }
-                });
+                }, scheduleExecutor);
             }
             if (batchException.get() != null) {
                 splitAssignment.setException(batchException.get());
             }
-        });
+        }, scheduleExecutor);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 7282de52aa0..495b5779597 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.common.ThreadPoolManager;
 import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -44,6 +45,7 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
     protected String icebergCatalogType;
     protected Catalog catalog;
+    private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;
 
     public IcebergExternalCatalog(long catalogId, String name, String comment) 
{
         super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
@@ -65,6 +67,12 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
         initCatalog();
         IcebergMetadataOps ops = 
ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
         transactionManager = 
TransactionManagerFactory.createIcebergTransactionManager(ops);
+        threadPoolWithPreAuth = 
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+            ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+            Integer.MAX_VALUE,
+            String.format("iceberg_catalog_%s_executor_pool", name),
+            true,
+            preExecutionAuthenticator);
         metadataOps = ops;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index c5a99c157ce..51d39357b81 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
 import org.apache.doris.datasource.CatalogProperty;
 import org.apache.doris.datasource.property.PropertyConverter;
 
@@ -37,11 +35,6 @@ public class IcebergHMSExternalCatalog extends 
IcebergExternalCatalog {
     protected void initCatalog() {
         icebergCatalogType = ICEBERG_HMS;
         catalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
-        if (preExecutionAuthenticator.getHadoopAuthenticator() == null) {
-            AuthenticationConfig config = 
AuthenticationConfig.getKerberosConfig(getConfiguration());
-            HadoopAuthenticator authenticator = 
HadoopAuthenticator.getHadoopAuthenticator(config);
-            preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
-        }
     }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 84c868afa6f..7bc95aa48dd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -94,11 +94,19 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public boolean tableExist(String dbName, String tblName) {
-        return catalog.tableExists(getTableIdentifier(dbName, tblName));
+        try {
+            return preExecutionAuthenticator.execute(() -> 
catalog.tableExists(getTableIdentifier(dbName, tblName)));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check table exist, error 
message is:" + e.getMessage(), e);
+        }
     }
 
     public boolean databaseExist(String dbName) {
-        return nsCatalog.namespaceExists(getNamespace(dbName));
+        try {
+            return preExecutionAuthenticator.execute(() -> 
nsCatalog.namespaceExists(getNamespace(dbName)));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to check database exist, error 
message is:" + e.getMessage(), e);
+        }
     }
 
     public List<String> listDatabaseNames() {
@@ -115,8 +123,14 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public List<String> listTableNames(String dbName) {
-        List<TableIdentifier> tableIdentifiers = 
catalog.listTables(getNamespace(dbName));
-        return 
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+        try {
+            return preExecutionAuthenticator.execute(() -> {
+                List<TableIdentifier> tableIdentifiers = 
catalog.listTables(getNamespace(dbName));
+                return 
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+            });
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list table names, error 
message is:" + e.getMessage(), e);
+        }
     }
 
     @Override
@@ -310,7 +324,11 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
 
     @Override
     public Table loadTable(String dbName, String tblName) {
-        return catalog.loadTable(getTableIdentifier(dbName, tblName));
+        try {
+            return preExecutionAuthenticator.execute(() -> 
catalog.loadTable(getTableIdentifier(dbName, tblName)));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to load table, error message 
is:" + e.getMessage(), e);
+        }
     }
 
     private TableIdentifier getTableIdentifier(String dbName, String tblName) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 5ef64153418..75b9b9e1812 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -51,10 +51,13 @@ import 
org.apache.doris.datasource.property.constants.HMSProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.thrift.TExprOpcode;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -63,10 +66,13 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.And;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.expressions.Not;
 import org.apache.iceberg.expressions.Or;
+import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.Unbound;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Type.TypeID;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.LocationUtil;
@@ -695,4 +701,33 @@ public class IcebergUtils {
         hiveCatalog.initialize(name, catalogProperties);
         return hiveCatalog;
     }
+
+    // Retrieve the manifest files that match the query based on partitions in 
filter
+    public static CloseableIterable<ManifestFile> getMatchingManifest(
+                List<ManifestFile> dataManifests,
+                Map<Integer, PartitionSpec> specsById,
+                Expression dataFilter) {
+        LoadingCache<Integer, ManifestEvaluator> evalCache = 
Caffeine.newBuilder()
+                .build(
+                        specId -> {
+                            PartitionSpec spec = specsById.get(specId);
+                            return ManifestEvaluator.forPartitionFilter(
+                                    Expressions.and(
+                                            Expressions.alwaysTrue(),
+                                            Projections.inclusive(spec, 
true).project(dataFilter)),
+                                    spec,
+                                    true);
+                        });
+
+        CloseableIterable<ManifestFile> matchingManifests = 
CloseableIterable.filter(
+                CloseableIterable.withNoopClose(dataManifests),
+                manifest -> 
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+        matchingManifests =
+                CloseableIterable.filter(
+                        matchingManifests,
+                        manifest -> manifest.hasAddedFiles() || 
manifest.hasExistingFiles());
+
+        return matchingManifests;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 215a94f110f..9551ea20388 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -22,9 +22,11 @@ import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalTable;
@@ -48,14 +50,15 @@ import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPushAggOp;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StructLike;
@@ -63,6 +66,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
@@ -70,13 +74,15 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.time.DateTimeException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class IcebergScanNode extends FileQueryScanNode {
 
@@ -93,7 +99,20 @@ public class IcebergScanNode extends FileQueryScanNode {
     // But for part of splits which have no position/equality delete files, we 
can still do count push down opt.
     // And for split level count push down opt, the flag is set in each split.
     private boolean tableLevelPushDownCount = false;
+    private long countFromSnapshot;
     private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
+    private long targetSplitSize;
+    private ConcurrentHashMap.KeySetView<Object, Boolean> partitionPathSet;
+    private boolean isPartitionedTable;
+    private int formatVersion;
+    private PreExecutionAuthenticator preExecutionAuthenticator;
+    private TableScan icebergTableScan;
+
+    // for test
+    @VisibleForTesting
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
+        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, false, sv);
+    }
 
     /**
      * External file scan node for Query iceberg table
@@ -129,6 +148,11 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     protected void doInitialize() throws UserException {
         icebergTable = source.getIcebergTable();
+        targetSplitSize = getRealFileSplitSize(0);
+        partitionPathSet = ConcurrentHashMap.newKeySet();
+        isPartitionedTable = icebergTable.spec().isPartitioned();
+        formatVersion = ((BaseTable) 
icebergTable).operations().current().formatVersion();
+        preExecutionAuthenticator = 
source.getCatalog().getPreExecutionAuthenticator();
         super.doInitialize();
     }
 
@@ -143,7 +167,6 @@ public class IcebergScanNode extends FileQueryScanNode {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
         TIcebergFileDesc fileDesc = new TIcebergFileDesc();
-        int formatVersion = icebergSplit.getFormatVersion();
         fileDesc.setFormatVersion(formatVersion);
         fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
         if (tableLevelPushDownCount) {
@@ -185,7 +208,7 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     public List<Split> getSplits(int numBackends) throws UserException {
         try {
-            return 
source.getCatalog().getPreExecutionAuthenticator().execute(() -> 
doGetSplits(numBackends));
+            return preExecutionAuthenticator.execute(() -> 
doGetSplits(numBackends));
         } catch (Exception e) {
             Optional<NotSupportedException> opt = 
checkNotSupportedException(e);
             if (opt.isPresent()) {
@@ -194,10 +217,66 @@ public class IcebergScanNode extends FileQueryScanNode {
                 throw new 
RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
             }
         }
+    }
 
+    @Override
+    public void startSplit(int numBackends) throws UserException {
+        try {
+            preExecutionAuthenticator.execute(() -> {
+                doStartSplit();
+                return null;
+            });
+        } catch (Exception e) {
+            throw new UserException(e.getMessage(), e);
+        }
     }
 
-    private List<Split> doGetSplits(int numBackends) throws UserException {
+    public void doStartSplit() {
+        TableScan scan = createTableScan();
+        CompletableFuture.runAsync(() -> {
+            AtomicReference<CloseableIterable<FileScanTask>> taskRef = new 
AtomicReference<>();
+            try {
+                preExecutionAuthenticator.execute(
+                        () -> {
+                            CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan);
+                            taskRef.set(fileScanTasks);
+
+                            CloseableIterator<FileScanTask> iterator = 
fileScanTasks.iterator();
+                            while (splitAssignment.needMoreSplit() && 
iterator.hasNext()) {
+                                try {
+                                    
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(iterator.next())));
+                                } catch (UserException e) {
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                        }
+                );
+                splitAssignment.finishSchedule();
+            } catch (Exception e) {
+                Optional<NotSupportedException> opt = 
checkNotSupportedException(e);
+                if (opt.isPresent()) {
+                    splitAssignment.setException(new 
UserException(opt.get().getMessage(), opt.get()));
+                } else {
+                    splitAssignment.setException(new 
UserException(e.getMessage(), e));
+                }
+            } finally {
+                if (taskRef.get() != null) {
+                    try {
+                        taskRef.get().close();
+                    } catch (IOException e) {
+                        // ignore
+                    }
+                }
+            }
+        }, Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor());
+    }
+
+    @VisibleForTesting
+    public TableScan createTableScan() {
+        if (icebergTableScan != null) {
+            return icebergTableScan;
+        }
+
         TableScan scan = icebergTable.newScan();
 
         // set snapshot
@@ -219,16 +298,16 @@ public class IcebergScanNode extends FileQueryScanNode {
             this.pushdownIcebergPredicates.add(predicate.toString());
         }
 
-        // get splits
-        List<Split> splits = new ArrayList<>();
-        int formatVersion = ((BaseTable) 
icebergTable).operations().current().formatVersion();
-        HashSet<String> partitionPathSet = new HashSet<>();
-        boolean isPartitionedTable = icebergTable.spec().isPartitioned();
+        icebergTableScan = 
scan.planWith(source.getCatalog().getThreadPoolWithPreAuth());
 
-        long realFileSplitSize = getRealFileSplitSize(0);
-        CloseableIterable<FileScanTask> fileScanTasks = null;
+        return icebergTableScan;
+    }
+
+    private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+        long targetSplitSize = getRealFileSplitSize(0);
+        CloseableIterable<FileScanTask> splitFiles;
         try {
-            fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(), 
realFileSplitSize);
+            splitFiles = TableScanUtil.splitFiles(scan.planFiles(), 
targetSplitSize);
         } catch (NullPointerException e) {
             /*
         Caused by: java.lang.NullPointerException: Type cannot be null
@@ -259,74 +338,122 @@ public class IcebergScanNode extends FileQueryScanNode {
             LOG.warn("Iceberg TableScanUtil.splitFiles throw 
NullPointerException. Cause : ", e);
             throw new NotSupportedException("Unable to read Iceberg table with 
dropped old partition column.");
         }
-        try (CloseableIterable<CombinedScanTask> combinedScanTasks =
-                     TableScanUtil.planTasks(fileScanTasks, realFileSplitSize, 
1, 0)) {
-            combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
-                if (isPartitionedTable) {
-                    StructLike structLike = splitTask.file().partition();
-                    // Counts the number of partitions read
-                    partitionPathSet.add(structLike.toString());
-                }
-                String originalPath = splitTask.file().path().toString();
-                LocationPath locationPath = new LocationPath(originalPath, 
source.getCatalog().getProperties());
-                IcebergSplit split = new IcebergSplit(
-                        locationPath,
-                        splitTask.start(),
-                        splitTask.length(),
-                        splitTask.file().fileSizeInBytes(),
-                        new String[0],
-                        formatVersion,
-                        source.getCatalog().getProperties(),
-                        new ArrayList<>(),
-                        originalPath);
-                split.setTargetSplitSize(realFileSplitSize);
-                if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
-                    
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
+        return splitFiles;
+    }
+
+    private Split createIcebergSplit(FileScanTask fileScanTask) {
+        if (isPartitionedTable) {
+            StructLike structLike = fileScanTask.file().partition();
+            // Counts the number of partitions read
+            partitionPathSet.add(structLike.toString());
+        }
+        String originalPath = fileScanTask.file().path().toString();
+        LocationPath locationPath = new LocationPath(originalPath, 
source.getCatalog().getProperties());
+        IcebergSplit split = new IcebergSplit(
+                locationPath,
+                fileScanTask.start(),
+                fileScanTask.length(),
+                fileScanTask.file().fileSizeInBytes(),
+                new String[0],
+                formatVersion,
+                source.getCatalog().getProperties(),
+                new ArrayList<>(),
+                originalPath);
+        if (!fileScanTask.deletes().isEmpty()) {
+            split.setDeleteFileFilters(getDeleteFileFilters(fileScanTask));
+        }
+        split.setTableFormatType(TableFormatType.ICEBERG);
+        split.setTargetSplitSize(targetSplitSize);
+        return split;
+    }
+
+    private List<Split> doGetSplits(int numBackends) throws UserException {
+
+        TableScan scan = createTableScan();
+        List<Split> splits = new ArrayList<>();
+
+        try (CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan)) {
+            if (tableLevelPushDownCount) {
+                int needSplitCnt = countFromSnapshot < 
COUNT_WITH_PARALLEL_SPLITS
+                        ? 1 : sessionVariable.getParallelExecInstanceNum() * 
numBackends;
+                for (FileScanTask next : fileScanTasks) {
+                    splits.add(createIcebergSplit(next));
+                    if (splits.size() >= needSplitCnt) {
+                        break;
+                    }
                 }
-                split.setTableFormatType(TableFormatType.ICEBERG);
-                splits.add(split);
-            }));
+                assignCountToSplits(splits, countFromSnapshot);
+                return splits;
+            } else {
+                fileScanTasks.forEach(taskGrp -> 
splits.add(createIcebergSplit(taskGrp)));
+            }
         } catch (IOException e) {
             throw new UserException(e.getMessage(), e.getCause());
         }
 
+        selectedPartitionNum = partitionPathSet.size();
+        return splits;
+    }
+
+    @Override
+    public boolean isBatchMode() {
         TPushAggOp aggOp = getPushDownAggNoGroupingOp();
         if (aggOp.equals(TPushAggOp.COUNT)) {
-            // we can create a special empty split and skip the plan process
-            if (splits.isEmpty()) {
-                return splits;
-            }
-            long countFromSnapshot = getCountFromSnapshot();
+            countFromSnapshot = getCountFromSnapshot();
             if (countFromSnapshot >= 0) {
                 tableLevelPushDownCount = true;
-                List<Split> pushDownCountSplits;
-                if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
-                    int minSplits = 
sessionVariable.getParallelExecInstanceNum() * numBackends;
-                    pushDownCountSplits = splits.subList(0, 
Math.min(splits.size(), minSplits));
-                } else {
-                    pushDownCountSplits = 
Collections.singletonList(splits.get(0));
+                return false;
+            }
+        }
+
+        if (createTableScan().snapshot() == null) {
+            return false;
+        }
+
+        if (!sessionVariable.getEnableExternalTableBatchMode()) {
+            return false;
+        }
+
+        try {
+            return preExecutionAuthenticator.execute(() -> {
+                try (CloseableIterator<ManifestFile> matchingManifest =
+                        IcebergUtils.getMatchingManifest(
+                                
createTableScan().snapshot().dataManifests(icebergTable.io()),
+                                icebergTable.specs(),
+                                createTableScan().filter()).iterator()) {
+                    int cnt = 0;
+                    while (matchingManifest.hasNext()) {
+                        ManifestFile next = matchingManifest.next();
+                        cnt += next.addedFilesCount() + 
next.existingFilesCount();
+                        if (cnt >= sessionVariable.getNumFilesInBatchMode()) {
+                            return true;
+                        }
+                    }
                 }
-                assignCountToSplits(pushDownCountSplits, countFromSnapshot);
-                return pushDownCountSplits;
+                return false;
+            });
+        } catch (Exception e) {
+            Optional<NotSupportedException> opt = 
checkNotSupportedException(e);
+            if (opt.isPresent()) {
+                throw opt.get();
+            } else {
+                throw new 
RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
             }
         }
-        selectedPartitionNum = partitionPathSet.size();
-        return splits;
     }
 
-    public Long getSpecifiedSnapshot() throws UserException {
+    public Long getSpecifiedSnapshot() {
         TableSnapshot tableSnapshot = getQueryTableSnapshot();
         if (tableSnapshot != null) {
             TableSnapshot.VersionType type = tableSnapshot.getType();
-            try {
-                if (type == TableSnapshot.VersionType.VERSION) {
-                    return tableSnapshot.getVersion();
-                } else {
-                    long timestamp = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
-                    return SnapshotUtil.snapshotIdAsOfTime(icebergTable, 
timestamp);
+            if (type == TableSnapshot.VersionType.VERSION) {
+                return tableSnapshot.getVersion();
+            } else {
+                long timestamp = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+                if (timestamp < 0) {
+                    throw new DateTimeException("can't parse time: " + 
tableSnapshot.getTime());
                 }
-            } catch (IllegalArgumentException e) {
-                throw new UserException(e);
+                return SnapshotUtil.snapshotIdAsOfTime(icebergTable, 
timestamp);
             }
         }
         return null;
@@ -408,13 +535,9 @@ public class IcebergScanNode extends FileQueryScanNode {
         return !col.isAllowNull();
     }
 
-    private long getCountFromSnapshot() {
-        Long specifiedSnapshot;
-        try {
-            specifiedSnapshot = getSpecifiedSnapshot();
-        } catch (UserException e) {
-            return -1;
-        }
+    @VisibleForTesting
+    public long getCountFromSnapshot() {
+        Long specifiedSnapshot = getSpecifiedSnapshot();
 
         Snapshot snapshot = specifiedSnapshot == null
                 ? icebergTable.currentSnapshot() : 
icebergTable.snapshot(specifiedSnapshot);
@@ -467,6 +590,11 @@ public class IcebergScanNode extends FileQueryScanNode {
         ((IcebergSplit) splits.get(size - 
1)).setTableLevelRowCount(countPerSplit + totalCount % size);
     }
 
+    @Override
+    public int numApproximateSplits() {
+        return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ? 
partitionPathSet.size() : 1;
+    }
+
     private Optional<NotSupportedException> 
checkNotSupportedException(Exception e) {
         if (e instanceof NullPointerException) {
             /*
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 0520612935a..e31ec5c3fad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -22,6 +22,7 @@ import org.apache.doris.datasource.FileSplit;
 
 import lombok.Data;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -35,7 +36,7 @@ public class IcebergSplit extends FileSplit {
     // but the original datafile path must be used.
     private final String originalPath;
     private Integer formatVersion;
-    private List<IcebergDeleteFileFilter> deleteFileFilters;
+    private List<IcebergDeleteFileFilter> deleteFileFilters = new 
ArrayList<>();
     private Map<String, String> config;
     // tableLevelRowCount will be set only table-level count push down opt is 
available.
     private long tableLevelRowCount = -1;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 063aeea68eb..632d88428fe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -267,8 +267,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
                                     
createTableBatchReadSession(requiredBatchPartitionSpecs);
                             List<Split> batchSplit = 
getSplitByTableSession(tableBatchReadSession);
 
-                            splitAssignment.addToQueue(batchSplit);
-                        } catch (IOException e) {
+                            if (splitAssignment.needMoreSplit()) {
+                                splitAssignment.addToQueue(batchSplit);
+                            }
+                        } catch (Exception e) {
                             batchException.set(new 
UserException(e.getMessage(), e));
                         } finally {
                             if (batchException.get() != null) {
@@ -288,7 +290,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
                     splitAssignment.setException(batchException.get());
                 }
             }
-        });
+        }, scheduleExecutor);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index b4033a0535e..e68ab2476d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -101,7 +101,7 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     protected SplitAssignment splitAssignment = null;
 
     protected long selectedPartitionNum = 0;
-    protected long selectedSplitNum = 0;
+    protected int selectedSplitNum = 0;
 
     // create a mapping between output slot's id and project expr
     Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index bb6f13618fa..1bbb5e51d29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -464,6 +464,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String NUM_PARTITIONS_IN_BATCH_MODE = 
"num_partitions_in_batch_mode";
 
+    public static final String NUM_FILES_IN_BATCH_MODE = 
"num_files_in_batch_mode";
+
     public static final String FETCH_SPLITS_MAX_WAIT_TIME = 
"fetch_splits_max_wait_time_ms";
 
     /**
@@ -1779,12 +1781,19 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     public int numPartitionsInBatchMode = 1024;
 
+    @VariableMgr.VarAttr(
+            name = NUM_FILES_IN_BATCH_MODE,
+            description = {"如果文件数量超过阈值,BE将通过batch方式获取scan ranges",
+                    "If the number of files exceeds the threshold, scan ranges 
will be got through batch mode."},
+            needForward = true)
+    public int numFilesInBatchMode = 1024;
+
     @VariableMgr.VarAttr(
             name = FETCH_SPLITS_MAX_WAIT_TIME,
             description = {"batch方式中BE获取splits的最大等待时间",
                     "The max wait time of getting splits in batch mode."},
             needForward = true)
-    public long fetchSplitsMaxWaitTime = 4000;
+    public long fetchSplitsMaxWaitTime = 1000;
 
     @VariableMgr.VarAttr(
             name = ENABLE_PARQUET_LAZY_MAT,
@@ -2165,6 +2174,13 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String IGNORE_RUNTIME_FILTER_IDS = 
"ignore_runtime_filter_ids";
 
+    public static final String ENABLE_EXTERNAL_TABLE_BATCH_MODE = 
"enable_external_table_batch_mode";
+    @VariableMgr.VarAttr(
+            name = ENABLE_EXTERNAL_TABLE_BATCH_MODE,
+            description = {"使能外表的batch mode功能", "Enable the batch mode 
function of the external table."},
+            needForward = true)
+    public boolean enableExternalTableBatchMode = true;
+
     public Set<Integer> getIgnoredRuntimeFilterIds() {
         Set<Integer> ids = Sets.newLinkedHashSet();
         if (ignoreRuntimeFilterIds.isEmpty()) {
@@ -3437,6 +3453,10 @@ public class SessionVariable implements Serializable, 
Writable {
         this.numPartitionsInBatchMode = numPartitionsInBatchMode;
     }
 
+    public int getNumFilesInBatchMode() {
+        return numFilesInBatchMode;
+    }
+
     public long getFetchSplitsMaxWaitTime() {
         return fetchSplitsMaxWaitTime;
     }
@@ -4789,5 +4809,9 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean showSplitProfileInfo() {
         return enableProfile() && getProfileLevel() > 1;
     }
+
+    public boolean getEnableExternalTableBatchMode() {
+        return enableExternalTableBatchMode;
+    }
 }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
new file mode 100644
index 00000000000..ab5205b47a7
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.spi.Split;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SplitAssignmentTest {
+
+    @Injectable
+    private FederationBackendPolicy mockBackendPolicy;
+
+    @Injectable
+    private SplitGenerator mockSplitGenerator;
+
+    @Injectable
+    private SplitToScanRange mockSplitToScanRange;
+
+    @Mocked
+    private Split mockSplit;
+
+    @Mocked
+    private Backend mockBackend;
+
+    @Mocked
+    private TScanRangeLocations mockScanRangeLocations;
+
+    private SplitAssignment splitAssignment;
+    private Map<String, String> locationProperties;
+    private List<String> pathPartitionKeys;
+
+    @BeforeEach
+    void setUp() {
+        locationProperties = new HashMap<>();
+        pathPartitionKeys = new ArrayList<>();
+
+        splitAssignment = new SplitAssignment(
+                mockBackendPolicy,
+                mockSplitGenerator,
+                mockSplitToScanRange,
+                locationProperties,
+                pathPartitionKeys
+        );
+    }
+
+    // ==================== init() method tests ====================
+
+    @Test
+    void testInitSuccess() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // Start a thread to simulate split generation after a short delay
+        Thread splitGeneratorThread = new Thread(() -> {
+            try {
+                Thread.sleep(50); // Short delay to simulate async split 
generation
+                List<Split> splits = Collections.singletonList(mockSplit);
+                splitAssignment.addToQueue(splits);
+            } catch (Exception e) {
+                // Ignore for test
+            }
+        });
+
+        splitGeneratorThread.start();
+
+        // Test
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+
+        // Verify sample split is set
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+
+        splitGeneratorThread.join(1000); // Wait for thread to complete
+    }
+
+    @Test
+    void testInitTimeout() throws Exception {
+        // Use MockUp to simulate timeout behavior quickly instead of waiting 
30 seconds
+        SplitAssignment testAssignment = new SplitAssignment(
+                mockBackendPolicy,
+                mockSplitGenerator,
+                mockSplitToScanRange,
+                locationProperties,
+                pathPartitionKeys
+        );
+
+        new MockUp<SplitAssignment>() {
+            @mockit.Mock
+            public void init() throws UserException {
+                // Directly throw timeout exception to simulate the timeout 
scenario quickly
+                throw new UserException("Failed to get first split after 
waiting for 0 seconds.");
+            }
+        };
+
+        // Test & Verify - should timeout immediately now
+        UserException exception = Assertions.assertThrows(UserException.class, 
() -> testAssignment.init());
+        Assertions.assertTrue(exception.getMessage().contains("Failed to get 
first split after waiting for"));
+    }
+
+    @Test
+    void testInitInterrupted() throws Exception {
+        CountDownLatch initStarted = new CountDownLatch(1);
+        CountDownLatch shouldInterrupt = new CountDownLatch(1);
+
+        Thread initThread = new Thread(() -> {
+            try {
+                initStarted.countDown();
+                shouldInterrupt.await();
+                splitAssignment.init();
+            } catch (Exception e) {
+                // Expected interruption
+            }
+        });
+
+        initThread.start();
+        initStarted.await();
+
+        // Interrupt the init thread
+        initThread.interrupt();
+        shouldInterrupt.countDown();
+
+        initThread.join(1000);
+    }
+
+    @Test
+    void testInitWithPreExistingException() throws Exception {
+        UserException preException = new UserException("Pre-existing error");
+        splitAssignment.setException(preException);
+
+        // Test & Verify
+        UserException exception = Assertions.assertThrows(UserException.class, 
() -> splitAssignment.init());
+        Assertions.assertTrue(exception.getMessage().contains(" Pre-existing 
error"), exception.getMessage());
+    }
+
+    // ==================== addToQueue() method tests ====================
+
+    @Test
+    void testAddToQueueWithEmptyList() throws Exception {
+        // Test
+        Assertions.assertDoesNotThrow(() -> 
splitAssignment.addToQueue(Collections.emptyList()));
+    }
+
+    @Test
+    void testAddToQueueSuccess() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // Mock setup
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // Test
+        Assertions.assertDoesNotThrow(() -> 
splitAssignment.addToQueue(splits));
+
+        // Verify sample split is set
+        Assertions.assertEquals(mockSplit, splitAssignment.getSampleSplit());
+
+        // Verify assignment queue is created and contains data
+        BlockingQueue<Collection<TScanRangeLocations>> queue = 
splitAssignment.getAssignedSplits(mockBackend);
+        Assertions.assertNotNull(queue);
+    }
+
+    @Test
+    void testAddToQueueSampleSplitAlreadySet() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+                minTimes = 0;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+                minTimes = 0;
+            }
+        };
+
+        // Setup: First call to set sample split
+        List<Split> firstSplits = Collections.singletonList(mockSplit);
+
+        splitAssignment.addToQueue(firstSplits);
+        Split firstSampleSplit = splitAssignment.getSampleSplit();
+
+        // Test: Second call should not change sample split
+        List<Split> secondSplits = Collections.singletonList(mockSplit);
+
+        splitAssignment.addToQueue(secondSplits);
+
+        // Verify sample split unchanged
+        Assertions.assertEquals(firstSampleSplit, 
splitAssignment.getSampleSplit());
+    }
+
+    @Test
+    void testAddToQueueWithQueueBlockingScenario() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // This test simulates a scenario where appendBatch might experience 
queue blocking
+        // by adding multiple batches rapidly
+
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // First, fill up the queue by adding many batches
+        for (int i = 0; i < 10; i++) {
+            splitAssignment.addToQueue(splits);
+        }
+
+        // Verify the queue has data
+        BlockingQueue<Collection<TScanRangeLocations>> queue = 
splitAssignment.getAssignedSplits(mockBackend);
+        Assertions.assertNotNull(queue);
+    }
+
+    @Test
+    void testAddToQueueConcurrentAccess() throws Exception {
+        Multimap<Backend, Split> batch = ArrayListMultimap.create();
+        batch.put(mockBackend, mockSplit);
+
+        new Expectations() {
+            {
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // Test concurrent access to addToQueue method
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        int threadCount = 5;
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+        List<Thread> threads = new ArrayList<>();
+        for (int i = 0; i < threadCount; i++) {
+            Thread thread = new Thread(() -> {
+                try {
+                    startLatch.await();
+                    splitAssignment.addToQueue(splits);
+                } catch (Exception e) {
+                    // Log but don't fail test for concurrency issues
+                } finally {
+                    doneLatch.countDown();
+                }
+            });
+            threads.add(thread);
+            thread.start();
+        }
+
+        startLatch.countDown(); // Start all threads
+        Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS)); // Wait 
for completion
+
+        // Verify sample split is set
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+
+        // Cleanup
+        for (Thread thread : threads) {
+            thread.join(1000);
+        }
+    }
+
+    // ==================== Integration tests for init() and addToQueue() 
====================
+
+    @Test
+    void testInitAndAddToQueueIntegration() throws Exception {
+        new Expectations() {
+            {
+                Multimap<Backend, Split> batch = ArrayListMultimap.create();
+                batch.put(mockBackend, mockSplit);
+
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // Start background thread to add splits after init starts
+        Thread splitsProvider = new Thread(() -> {
+            try {
+                Thread.sleep(100); // Small delay to ensure init is waiting
+                splitAssignment.addToQueue(splits);
+            } catch (Exception e) {
+                // Ignore
+            }
+        });
+
+        splitsProvider.start();
+
+        // Test init - should succeed once splits are added
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+
+        // Verify
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+        Assertions.assertEquals(mockSplit, splitAssignment.getSampleSplit());
+
+        BlockingQueue<Collection<TScanRangeLocations>> queue = 
splitAssignment.getAssignedSplits(mockBackend);
+        Assertions.assertNotNull(queue);
+
+        splitsProvider.join(1000);
+    }
+
+    // ==================== appendBatch() behavior tests ====================
+
+    @Test
+    void testAppendBatchTimeoutBehavior() throws Exception {
+        new Expectations() {
+            {
+                Multimap<Backend, Split> batch = ArrayListMultimap.create();
+                batch.put(mockBackend, mockSplit);
+
+                mockBackendPolicy.computeScanRangeAssignment((List<Split>) 
any);
+                result = batch;
+
+                mockSplitToScanRange.getScanRange(mockBackend, 
locationProperties, mockSplit, pathPartitionKeys);
+                result = mockScanRangeLocations;
+            }
+        };
+
+        // This test verifies that appendBatch properly handles queue offer 
timeouts
+        // We'll simulate this by first filling the assignment and then trying 
to add more
+
+        List<Split> splits = Collections.singletonList(mockSplit);
+
+        // Add multiple splits to potentially cause queue pressure
+        for (int i = 0; i < 50; i++) {
+            try {
+                splitAssignment.addToQueue(splits);
+            } catch (Exception e) {
+                // Expected if queue gets full and times out
+                break;
+            }
+        }
+
+        // Verify that splits were processed
+        Assertions.assertNotNull(splitAssignment.getSampleSplit());
+    }
+
+    @Test
+    void testInitWhenNeedMoreSplitReturnsFalse() throws Exception {
+        // Test init behavior when needMoreSplit() returns false
+        splitAssignment.stop(); // This should make needMoreSplit() return 
false
+
+        // Init should complete immediately without waiting
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+    }
+
+    @Test
+    void testInitWithScheduleFinished() throws Exception {
+        // Test init behavior when schedule is already finished
+        splitAssignment.finishSchedule();
+
+        // Init should complete immediately without waiting
+        Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
index 244f173acb7..b76f24667ad 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
@@ -17,12 +17,33 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.GenericPartitionFieldSummary;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.StructType;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class IcebergUtilsTest {
     @Test
@@ -38,7 +59,7 @@ public class IcebergUtilsTest {
                             new HashMap<String, String>() {{
                                     put("list-all-tables", "true");
                                 }},
-                        "");
+                            "");
             HiveCatalog i2 = IcebergUtils.createIcebergHiveCatalog(c2, "i1");
             Assert.assertTrue(getListAllTables(i2));
 
@@ -60,4 +81,126 @@ public class IcebergUtilsTest {
         declaredField.setAccessible(true);
         return declaredField.getBoolean(hiveCatalog);
     }
+
+    @Test
+    public void testGetMatchingManifest() {
+
+        // partition : 100 - 200
+        GenericManifestFile f1 = 
getGenericManifestFileForDataTypeWithPartitionSummary(
+                "manifest_f1.avro",
+                Collections.singletonList(new GenericPartitionFieldSummary(
+                    false, false, getByteBufferForLong(100), 
getByteBufferForLong(200))));
+
+        // partition : 300 - 400
+        GenericManifestFile f2 = 
getGenericManifestFileForDataTypeWithPartitionSummary(
+                "manifest_f2.avro",
+                Collections.singletonList(new GenericPartitionFieldSummary(
+                    false, false, getByteBufferForLong(300), 
getByteBufferForLong(400))));
+
+        // partition : 500 - 600
+        GenericManifestFile f3 = 
getGenericManifestFileForDataTypeWithPartitionSummary(
+                "manifest_f3.avro",
+                    Collections.singletonList(new GenericPartitionFieldSummary(
+                        false, false, getByteBufferForLong(500), 
getByteBufferForLong(600))));
+
+        List<ManifestFile> manifestFiles = new ArrayList<ManifestFile>() {{
+                add(f1);
+                add(f2);
+                add(f3);
+            }};
+
+        Schema schema = new Schema(
+                StructType.of(
+                        Types.NestedField.required(1, "id", LongType.get()),
+                        Types.NestedField.required(2, "data", LongType.get()),
+                        Types.NestedField.required(3, "par", LongType.get()))
+                    .fields());
+
+        // test empty partition spec
+        HashMap<Integer, PartitionSpec> emptyPartitionSpecsById = new 
HashMap<Integer, PartitionSpec>() {{
+                put(0, PartitionSpec.builderFor(schema).build());
+            }};
+        assertManifest(manifestFiles, emptyPartitionSpecsById, 
Expressions.alwaysTrue(), manifestFiles);
+
+        // test long partition spec
+        HashMap<Integer, PartitionSpec> longPartitionSpecsById = new 
HashMap<Integer, PartitionSpec>() {{
+                put(0, 
PartitionSpec.builderFor(schema).identity("par").build());
+            }};
+        // 1. par > 10
+        UnboundPredicate<Long> e1 = Expressions.greaterThan("par", 10L);
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(Expressions.alwaysTrue(), e1), manifestFiles);
+
+        // 2. 10 < par < 90
+        UnboundPredicate<Long> e2 = Expressions.greaterThan("par", 90L);
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(e1, e2), manifestFiles);
+
+        // 3. 10 < par < 300
+        UnboundPredicate<Long> e3 = Expressions.lessThan("par", 300L);
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(e1, e3), Collections.singletonList(f1));
+
+        // 4. 10 < par < 400
+        UnboundPredicate<Long> e4 = Expressions.lessThan("par", 400L);
+        ArrayList<ManifestFile> expect1 = new ArrayList<ManifestFile>() {{
+                add(f1);
+                add(f2);
+            }};
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(e1, e4), expect1);
+
+        // 5. 10 < par < 501
+        UnboundPredicate<Long> e5 = Expressions.lessThan("par", 501L);
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(e1, e5), manifestFiles);
+
+        // 6. 200 < par < 501
+        UnboundPredicate<Long> e6 = Expressions.greaterThan("par", 200L);
+        ArrayList<ManifestFile> expect2 = new ArrayList<ManifestFile>() {{
+                add(f2);
+                add(f3);
+            }};
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(e6, e5), expect2);
+
+        // 7. par > 600
+        UnboundPredicate<Long> e7 = Expressions.greaterThan("par", 600L);
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(Expressions.alwaysTrue(), e7), Collections.emptyList());
+
+        // 8. par < 100
+        UnboundPredicate<Long> e8 = Expressions.lessThan("par", 100L);
+        assertManifest(manifestFiles, longPartitionSpecsById, 
Expressions.and(Expressions.alwaysTrue(), e8), Collections.emptyList());
+    }
+
+    private void assertManifest(List<ManifestFile> dataManifests,
+                                Map<Integer, PartitionSpec> specsById,
+                                Expression dataFilter,
+                                List<ManifestFile> expected) {
+        CloseableIterable<ManifestFile> matchingManifest =
+                IcebergUtils.getMatchingManifest(dataManifests, specsById, 
dataFilter);
+        List<ManifestFile> ret = new ArrayList<>();
+        matchingManifest.forEach(ret::add);
+        ret.sort(Comparator.comparing(ManifestFile::path));
+        Assert.assertEquals(expected, ret);
+    }
+
+    private ByteBuffer getByteBufferForLong(long num) {
+        return Conversions.toByteBuffer(Types.LongType.get(), num);
+    }
+
+    private GenericManifestFile 
getGenericManifestFileForDataTypeWithPartitionSummary(
+            String path,
+            List<PartitionFieldSummary> partitionFieldSummaries) {
+        return new GenericManifestFile(
+            path,
+            1024L,
+            0,
+            ManifestContent.DATA,
+            1,
+            1,
+            123456789L,
+            2,
+            100,
+            0,
+            0,
+            0,
+            0,
+            partitionFieldSummaries,
+            null);
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
new file mode 100644
index 00000000000..8ae51a61f46
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TPushAggOp;
+
+import com.google.common.collect.Lists;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopTableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class IcebergScanNodeTest {
+
+    @Mocked
+    HadoopTableOperations hadoopTableOperations;
+    @Mocked
+    Snapshot snapshot;
+
+    @Test
+    public void testIsBatchMode() {
+        SessionVariable sessionVariable = new SessionVariable();
+        IcebergScanNode icebergScanNode = new IcebergScanNode(new 
PlanNodeId(1), new TupleDescriptor(new TupleId(1)), sessionVariable);
+
+        new Expectations(icebergScanNode) {{
+                icebergScanNode.getPushDownAggNoGroupingOp();
+                result = TPushAggOp.COUNT;
+                icebergScanNode.getCountFromSnapshot();
+                result = 1L;
+            }};
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        BaseTable mockTable = new BaseTable(hadoopTableOperations, 
"mockTable");
+        new Expectations(icebergScanNode) {{
+                icebergScanNode.getPushDownAggNoGroupingOp();
+                result = TPushAggOp.NONE;
+                Deencapsulation.setField(icebergScanNode, "icebergTable", 
mockTable);
+            }};
+        TableScan tableScan = mockTable.newScan();
+        new Expectations(mockTable) {{
+                mockTable.currentSnapshot();
+                result = null;
+                icebergScanNode.createTableScan();
+                result = tableScan;
+            }};
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        new Expectations(mockTable) {{
+                mockTable.currentSnapshot();
+                result = snapshot;
+            }};
+        new Expectations(sessionVariable) {{
+                sessionVariable.getEnableExternalTableBatchMode();
+                result = false;
+            }};
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+
+        new Expectations(sessionVariable) {{
+                sessionVariable.getEnableExternalTableBatchMode();
+                result = true;
+            }};
+        new Expectations(icebergScanNode) {{
+                Deencapsulation.setField(icebergScanNode, 
"preExecutionAuthenticator", new PreExecutionAuthenticator());
+            }};
+        new Expectations() {{
+                sessionVariable.getNumFilesInBatchMode();
+                result = 1024;
+            }};
+
+        mockManifestFile("p", 10, 0);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 10, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 1024, 0);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 1024);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        new Expectations() {{
+                sessionVariable.getNumFilesInBatchMode();
+                result = 100;
+            }};
+
+        mockManifestFile("p", 10, 0);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 10, 10);
+        Assert.assertFalse(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 0, 100);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 100, 0);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+
+        mockManifestFile("p", 10, 90);
+        Assert.assertTrue(icebergScanNode.isBatchMode());
+    }
+
+    private void mockManifestFile(String path, int addedFileCount, int 
existingFileCount) {
+        new MockUp<IcebergUtils>() {
+            @Mock
+            CloseableIterable<ManifestFile> 
getMatchingManifest(List<ManifestFile> dataManifests,
+                                                                Map<Integer, 
PartitionSpec> specsById,
+                                                                Expression 
dataFilte) {
+                return CloseableIterable.withNoopClose(new 
ArrayList<ManifestFile>() {{
+                        add(genManifestFile(path, addedFileCount, 
existingFileCount));
+                    }}
+                );
+            }
+        };
+    }
+
+    private ManifestFile genManifestFile(String path, int addedFileCount, int 
existingFileCount) {
+        return new GenericManifestFile(
+            path,
+            10, // length
+            1, // specId
+            ManifestContent.DATA,
+            1, // sequenceNumber
+            1, // minSeqNumber
+            1L, // snapshotid
+            addedFileCount,
+            1,
+            existingFileCount,
+            1,
+            0, // deleteFilesCount
+            0,
+            Lists.newArrayList(),
+            null
+        );
+    }
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
index 7e654175f9c..94d753cc986 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
@@ -19,6 +19,7 @@ suite("test_iceberg_filter", 
"p0,external,doris,external_docker,external_docker_
     String enabled = context.config.otherConfigs.get("enableIcebergTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         try {
+            sql """set enable_external_table_batch_mode=false"""
             String rest_port = 
context.config.otherConfigs.get("iceberg_rest_uri_port")
             String minio_port = 
context.config.otherConfigs.get("iceberg_minio_port")
             String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
@@ -91,6 +92,7 @@ suite("test_iceberg_filter", 
"p0,external,doris,external_docker,external_docker_
             }
 
         } finally {
+            sql """set enable_external_table_batch_mode=true"""
         }
     }
 }
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
index 556314225c5..bc3b006fb93 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
@@ -55,21 +55,76 @@ suite("test_iceberg_optimize_count", 
"p0,external,doris,external_docker,external
         qt_q03 """${sqlstr3}""" 
         qt_q04 """${sqlstr4}""" 
 
+        // traditional mode
+        sql """set num_files_in_batch_mode=100000"""
+        explain {
+            sql("""select * from sample_cow_orc""")
+            notContains "approximate"
+        }
         explain {
             sql("""${sqlstr1}""")
             contains """pushdown agg=COUNT (1000)"""
         }
+        explain {
+            sql("""select * from sample_cow_parquet""")
+            notContains "approximate"
+        }
         explain {
             sql("""${sqlstr2}""")
             contains """pushdown agg=COUNT (1000)"""
         }
+        explain {
+            sql("""select * from sample_mor_orc""")
+            notContains "approximate"
+        }
         explain {
             sql("""${sqlstr3}""")
             contains """pushdown agg=COUNT (1000)"""
         }
+        // because it has dangling delete
+        explain {
+            sql("""${sqlstr4}""")
+            contains """pushdown agg=COUNT (-1)"""
+        }
+
+        // batch mode
+        sql """set num_files_in_batch_mode=1"""
+        explain {
+            sql("""select * from sample_cow_orc""")
+            contains "approximate"
+        }
+        explain {
+            sql("""${sqlstr1}""")
+            contains """pushdown agg=COUNT (1000)"""
+            notContains "approximate"
+        }
+        explain {
+            sql("""select * from sample_cow_parquet""")
+            contains "approximate"
+        }
+        explain {
+            sql("""${sqlstr2}""")
+            contains """pushdown agg=COUNT (1000)"""
+            notContains "approximate"
+        }
+        explain {
+            sql("""select * from sample_mor_orc""")
+            contains "approximate"
+        }
+        explain {
+            sql("""${sqlstr3}""")
+            contains """pushdown agg=COUNT (1000)"""
+            notContains "approximate"
+        }
+        explain {
+            sql("""select * from sample_mor_parquet""")
+            contains "approximate"
+        }
+        // because it has dangling delete
         explain {
             sql("""${sqlstr4}""")
             contains """pushdown agg=COUNT (-1)"""
+            contains "approximate"
         }
 
         // don't use push down count
@@ -110,6 +165,7 @@ suite("test_iceberg_optimize_count", 
"p0,external,doris,external_docker,external
 
     } finally {
         sql """ set enable_count_push_down_for_external_table=true; """
+        sql """set num_partitions_in_batch_mode=1024"""
         // sql """drop catalog if exists ${catalog_name}"""
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to