This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d6736b90e88 branch-3.1: [feat](iceberg) implement iceberg partition
batch mode (#52095)
d6736b90e88 is described below
commit d6736b90e88b4236876fc9568a54ba38e2972756
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sat Jun 21 11:44:31 2025 +0800
branch-3.1: [feat](iceberg) implement iceberg partition batch mode (#52095)
bp #46398 #49025 #49489 #50434 #51185 #51694
---------
Co-authored-by: wuwenchi <[email protected]>
Co-authored-by: daidai <[email protected]>
---
be/src/common/config.cpp | 2 +-
.../org/apache/doris/common/ThreadPoolManager.java | 69 ++++
.../apache/doris/datasource/ExternalCatalog.java | 6 +
.../doris/datasource/ExternalMetaCacheMgr.java | 1 +
.../apache/doris/datasource/FileQueryScanNode.java | 8 +-
.../apache/doris/datasource/SplitAssignment.java | 86 +++--
.../apache/doris/datasource/SplitGenerator.java | 2 +-
.../doris/datasource/hive/source/HiveScanNode.java | 4 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 12 +-
.../datasource/iceberg/IcebergExternalCatalog.java | 8 +
.../iceberg/IcebergHMSExternalCatalog.java | 7 -
.../datasource/iceberg/IcebergMetadataOps.java | 28 +-
.../doris/datasource/iceberg/IcebergUtils.java | 35 ++
.../datasource/iceberg/source/IcebergScanNode.java | 270 +++++++++----
.../datasource/iceberg/source/IcebergSplit.java | 3 +-
.../maxcompute/source/MaxComputeScanNode.java | 8 +-
.../java/org/apache/doris/planner/ScanNode.java | 2 +-
.../java/org/apache/doris/qe/SessionVariable.java | 26 +-
.../doris/datasource/SplitAssignmentTest.java | 427 +++++++++++++++++++++
.../doris/datasource/iceberg/IcebergUtilsTest.java | 145 ++++++-
.../iceberg/source/IcebergScanNodeTest.java | 181 +++++++++
.../iceberg/test_iceberg_filter.groovy | 2 +
.../iceberg/test_iceberg_optimize_count.groovy | 56 +++
23 files changed, 1264 insertions(+), 124 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 4ce28d89f0e..b57865daef0 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -306,7 +306,7 @@ DEFINE_Validator(doris_scanner_thread_pool_thread_num,
[](const int config) -> b
return true;
});
DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
-DEFINE_Int32(remote_split_source_batch_size, "10240");
+DEFINE_Int32(remote_split_source_batch_size, "1000");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
index dcb1f704271..0822a322dec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ThreadPoolManager.java
@@ -18,6 +18,7 @@
package org.apache.doris.common;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricLabel;
@@ -33,6 +34,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -69,6 +71,7 @@ import java.util.function.Supplier;
public class ThreadPoolManager {
+ private static final Logger LOG =
LogManager.getLogger(ThreadPoolManager.class);
private static Map<String, ThreadPoolExecutor> nameToThreadPoolMap =
Maps.newConcurrentMap();
private static String[] poolMetricTypes = {"pool_size",
"active_thread_num", "task_in_queue"};
@@ -140,6 +143,17 @@ public class ThreadPoolManager {
poolName, needRegisterMetric);
}
+ public static ThreadPoolExecutor newDaemonFixedThreadPoolWithPreAuth(
+ int numThread,
+ int queueSize,
+ String poolName,
+ boolean needRegisterMetric,
+ PreExecutionAuthenticator preAuth) {
+ return newDaemonThreadPoolWithPreAuth(numThread, numThread,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(queueSize), new BlockedPolicy(poolName,
60),
+ poolName, needRegisterMetric, preAuth);
+ }
+
public static ThreadPoolExecutor newDaemonFixedThreadPool(int numThread,
int queueSize,
String poolName,
int timeoutSeconds,
boolean
needRegisterMetric) {
@@ -229,6 +243,40 @@ public class ThreadPoolManager {
return new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(poolName + "-%d").build();
}
+
+ public static ThreadPoolExecutor newDaemonThreadPoolWithPreAuth(
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ RejectedExecutionHandler handler,
+ String poolName,
+ boolean needRegisterMetric,
+ PreExecutionAuthenticator preAuth) {
+ ThreadFactory threadFactory = namedThreadFactoryWithPreAuth(poolName,
preAuth);
+ ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize,
+ keepAliveTime, unit, workQueue, threadFactory, handler);
+ if (needRegisterMetric) {
+ nameToThreadPoolMap.put(poolName, threadPool);
+ }
+ return threadPool;
+ }
+
+ private static ThreadFactory namedThreadFactoryWithPreAuth(String
poolName, PreExecutionAuthenticator preAuth) {
+ return new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(poolName + "-%d")
+ .setThreadFactory(runnable -> new Thread(() -> {
+ try {
+ preAuth.execute(runnable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }))
+ .build();
+ }
+
private static class PriorityThreadPoolExecutor<T> extends
ThreadPoolExecutor {
private final Comparator<T> comparator;
@@ -384,4 +432,25 @@ public class ThreadPoolManager {
}
}
}
+
+ public static void shutdownExecutorService(ExecutorService
executorService) {
+ // Disable new tasks from being submitted
+ executorService.shutdown();
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ // Cancel currently executing tasks
+ executorService.shutdownNow();
+ // Wait a while for tasks to respond to being cancelled
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ LOG.warn("ExecutorService did not terminate");
+ }
+ }
+ } catch (InterruptedException e) {
+ // (Re-)Cancel if current thread also interrupted
+ executorService.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index d89d4582862..6aed86e9c25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.io.Text;
@@ -98,6 +99,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
/**
@@ -167,6 +169,7 @@ public abstract class ExternalCatalog
protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
protected PreExecutionAuthenticator preExecutionAuthenticator;
+ protected ThreadPoolExecutor threadPoolWithPreAuth;
private volatile Configuration cachedConf = null;
private byte[] confLock = new byte[0];
@@ -764,6 +767,9 @@ public abstract class ExternalCatalog
if (null != transactionManager) {
transactionManager = null;
}
+ if (threadPoolWithPreAuth != null) {
+ ThreadPoolManager.shutdownExecutorService(threadPoolWithPreAuth);
+ }
CatalogIf.super.onClose();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index c2f50f929f8..79dd86345ee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -83,6 +83,7 @@ public class ExternalMetaCacheMgr {
private ExecutorService rowCountRefreshExecutor;
private ExecutorService commonRefreshExecutor;
private ExecutorService fileListingExecutor;
+ // This executor is used to schedule the getting split tasks
private ExecutorService scheduleExecutor;
// catalog id -> HiveMetaStoreCache
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 107c3541577..dc85731d4d2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -335,17 +335,19 @@ public abstract class FileQueryScanNode extends
FileScanNode {
if (splitAssignment.getSampleSplit() == null &&
!isFileStreamType()) {
return;
}
- selectedSplitNum = numApproximateSplits();
-
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
TFileType locationType = fileSplit.getLocationType();
+ selectedSplitNum = numApproximateSplits();
+ if (selectedSplitNum < 0) {
+ throw new UserException("Approximate split number should not
be negative");
+ }
totalFileSize = fileSplit.getLength() * selectedSplitNum;
long maxWaitTime = sessionVariable.getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
// Here, we must take the max of 1, because
// in the case of multiple BEs, `numApproximateSplits() /
backendPolicy.numBackends()` may be 0,
// and finally numSplitsPerBE is 0, resulting in no data being
queried.
- int numSplitsPerBE = Math.max(numApproximateSplits() /
backendPolicy.numBackends(), 1);
+ int numSplitsPerBE = Math.max(selectedSplitNum /
backendPolicy.numBackends(), 1);
for (Backend backend : backendPolicy.getBackends()) {
SplitSource splitSource = new SplitSource(backend,
splitAssignment, maxWaitTime);
splitSources.add(splitSource);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
index a26abc7fc5e..cc17818d6b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitAssignment.java
@@ -23,7 +23,10 @@ import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.collect.Multimap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -33,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -40,6 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* `SplitGenerator` provides the file splits, and `FederationBackendPolicy`
assigns these splits to backends.
*/
public class SplitAssignment {
+ private static final Logger LOG =
LogManager.getLogger(SplitAssignment.class);
private final Set<Long> sources = new HashSet<>();
private final FederationBackendPolicy backendPolicy;
private final SplitGenerator splitGenerator;
@@ -50,10 +55,11 @@ public class SplitAssignment {
private final List<String> pathPartitionKeys;
private final Object assignLock = new Object();
private Split sampleSplit = null;
- private final AtomicBoolean isStop = new AtomicBoolean(false);
+ private final AtomicBoolean isStopped = new AtomicBoolean(false);
private final AtomicBoolean scheduleFinished = new AtomicBoolean(false);
private UserException exception = null;
+ private final List<Closeable> closeableResources = new ArrayList<>();
public SplitAssignment(
FederationBackendPolicy backendPolicy,
@@ -71,12 +77,20 @@ public class SplitAssignment {
public void init() throws UserException {
splitGenerator.startSplit(backendPolicy.numBackends());
synchronized (assignLock) {
- while (sampleSplit == null && waitFirstSplit()) {
+ final int waitIntervalTimeMillis = 100;
+ final int initTimeoutMillis = 30000; // 30s
+ int waitTotalTime = 0;
+ while (sampleSplit == null && needMoreSplit()) {
try {
- assignLock.wait(100);
+ assignLock.wait(waitIntervalTimeMillis);
} catch (InterruptedException e) {
throw new UserException(e.getMessage(), e);
}
+ waitTotalTime += waitIntervalTimeMillis;
+ if (waitTotalTime > initTimeoutMillis) {
+ throw new UserException("Failed to get first split after
waiting for "
+ + (waitTotalTime / 1000) + " seconds.");
+ }
}
}
if (exception != null) {
@@ -84,8 +98,8 @@ public class SplitAssignment {
}
}
- private boolean waitFirstSplit() {
- return !scheduleFinished.get() && !isStop.get() && exception == null;
+ public boolean needMoreSplit() {
+ return !scheduleFinished.get() && !isStopped.get() && exception ==
null;
}
private void appendBatch(Multimap<Backend, Split> batch) throws
UserException {
@@ -95,8 +109,16 @@ public class SplitAssignment {
for (Split split : splits) {
locations.add(splitToScanRange.getScanRange(backend,
locationProperties, split, pathPartitionKeys));
}
- if (!assignment.computeIfAbsent(backend, be -> new
LinkedBlockingQueue<>()).offer(locations)) {
- throw new UserException("Failed to offer batch split");
+ while (needMoreSplit()) {
+ BlockingQueue<Collection<TScanRangeLocations>> queue =
+ assignment.computeIfAbsent(backend, be -> new
LinkedBlockingQueue<>(10000));
+ try {
+ if (queue.offer(locations, 100, TimeUnit.MILLISECONDS)) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ addUserException(new UserException("Failed to offer batch
split by interrupted", e));
+ }
}
}
}
@@ -113,7 +135,7 @@ public class SplitAssignment {
return sampleSplit;
}
- public void addToQueue(List<Split> splits) {
+ public void addToQueue(List<Split> splits) throws UserException {
if (splits.isEmpty()) {
return;
}
@@ -123,19 +145,9 @@ public class SplitAssignment {
sampleSplit = splits.get(0);
assignLock.notify();
}
- try {
- batch = backendPolicy.computeScanRangeAssignment(splits);
- } catch (UserException e) {
- exception = e;
- }
- }
- if (batch != null) {
- try {
- appendBatch(batch);
- } catch (UserException e) {
- exception = e;
- }
+ batch = backendPolicy.computeScanRangeAssignment(splits);
}
+ appendBatch(batch);
}
private void notifyAssignment() {
@@ -150,28 +162,54 @@ public class SplitAssignment {
}
BlockingQueue<Collection<TScanRangeLocations>> splits =
assignment.computeIfAbsent(backend,
be -> new LinkedBlockingQueue<>());
- if (scheduleFinished.get() && splits.isEmpty() || isStop.get()) {
+ if (scheduleFinished.get() && splits.isEmpty() || isStopped.get()) {
return null;
}
return splits;
}
public void setException(UserException e) {
- exception = e;
+ addUserException(e);
notifyAssignment();
}
+ private void addUserException(UserException e) {
+ if (exception != null) {
+ exception.addSuppressed(e);
+ } else {
+ exception = e;
+ }
+ }
+
public void finishSchedule() {
scheduleFinished.set(true);
notifyAssignment();
}
public void stop() {
- isStop.set(true);
+ if (isStop()) {
+ return;
+ }
+ isStopped.set(true);
+ closeableResources.forEach((closeable) -> {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.warn("close resource error:{}", e.getMessage(), e);
+ // ignore
+ }
+ });
notifyAssignment();
+ if (exception != null) {
+ throw new RuntimeException(exception);
+ }
}
public boolean isStop() {
- return isStop.get();
+ return isStopped.get();
+ }
+
+ public void addCloseable(Closeable resource) {
+ closeableResources.add(resource);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
index 34ff3911445..391552a5106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SplitGenerator.java
@@ -52,7 +52,7 @@ public interface SplitGenerator {
return -1;
}
- default void startSplit(int numBackends) {
+ default void startSplit(int numBackends) throws UserException {
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 9aff964631b..91002842b44 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -223,7 +223,9 @@ public class HiveScanNode extends FileQueryScanNode {
if (allFiles.size() > numSplitsPerPartition.get())
{
numSplitsPerPartition.set(allFiles.size());
}
- splitAssignment.addToQueue(allFiles);
+ if (splitAssignment.needMoreSplit()) {
+ splitAssignment.addToQueue(allFiles);
+ }
} catch (Exception e) {
batchException.set(new
UserException(e.getMessage(), e));
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 8e33dfb2469..f14e02f3e58 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -74,6 +74,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -401,6 +402,7 @@ public class HudiScanNode extends HiveScanNode {
return;
}
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
+ ExecutorService scheduleExecutor =
Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
CompletableFuture.runAsync(() -> {
for (HivePartition partition : prunedPartitions) {
if (batchException.get() != null || splitAssignment.isStop()) {
@@ -419,8 +421,10 @@ public class HudiScanNode extends HiveScanNode {
if (allFiles.size() > numSplitsPerPartition.get()) {
numSplitsPerPartition.set(allFiles.size());
}
- splitAssignment.addToQueue(allFiles);
- } catch (IOException e) {
+ if (splitAssignment.needMoreSplit()) {
+ splitAssignment.addToQueue(allFiles);
+ }
+ } catch (Exception e) {
batchException.set(new UserException(e.getMessage(),
e));
} finally {
splittersOnFlight.release();
@@ -431,12 +435,12 @@ public class HudiScanNode extends HiveScanNode {
splitAssignment.finishSchedule();
}
}
- });
+ }, scheduleExecutor);
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
- });
+ }, scheduleExecutor);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index 7282de52aa0..495b5779597 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,6 +17,7 @@
package org.apache.doris.datasource.iceberg;
+import org.apache.doris.common.ThreadPoolManager;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -44,6 +45,7 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
protected String icebergCatalogType;
protected Catalog catalog;
+ private static final int ICEBERG_CATALOG_EXECUTOR_THREAD_NUM = 16;
public IcebergExternalCatalog(long catalogId, String name, String comment)
{
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
@@ -65,6 +67,12 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
initCatalog();
IcebergMetadataOps ops =
ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager =
TransactionManagerFactory.createIcebergTransactionManager(ops);
+ threadPoolWithPreAuth =
ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
+ ICEBERG_CATALOG_EXECUTOR_THREAD_NUM,
+ Integer.MAX_VALUE,
+ String.format("iceberg_catalog_%s_executor_pool", name),
+ true,
+ preExecutionAuthenticator);
metadataOps = ops;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
index c5a99c157ce..51d39357b81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java
@@ -17,8 +17,6 @@
package org.apache.doris.datasource.iceberg;
-import org.apache.doris.common.security.authentication.AuthenticationConfig;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;
@@ -37,11 +35,6 @@ public class IcebergHMSExternalCatalog extends
IcebergExternalCatalog {
protected void initCatalog() {
icebergCatalogType = ICEBERG_HMS;
catalog = IcebergUtils.createIcebergHiveCatalog(this, getName());
- if (preExecutionAuthenticator.getHadoopAuthenticator() == null) {
- AuthenticationConfig config =
AuthenticationConfig.getKerberosConfig(getConfiguration());
- HadoopAuthenticator authenticator =
HadoopAuthenticator.getHadoopAuthenticator(config);
- preExecutionAuthenticator.setHadoopAuthenticator(authenticator);
- }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 84c868afa6f..7bc95aa48dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -94,11 +94,19 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
@Override
public boolean tableExist(String dbName, String tblName) {
- return catalog.tableExists(getTableIdentifier(dbName, tblName));
+ try {
+ return preExecutionAuthenticator.execute(() ->
catalog.tableExists(getTableIdentifier(dbName, tblName)));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check table exist, error
message is:" + e.getMessage(), e);
+ }
}
public boolean databaseExist(String dbName) {
- return nsCatalog.namespaceExists(getNamespace(dbName));
+ try {
+ return preExecutionAuthenticator.execute(() ->
nsCatalog.namespaceExists(getNamespace(dbName)));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check database exist, error
message is:" + e.getMessage(), e);
+ }
}
public List<String> listDatabaseNames() {
@@ -115,8 +123,14 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
@Override
public List<String> listTableNames(String dbName) {
- List<TableIdentifier> tableIdentifiers =
catalog.listTables(getNamespace(dbName));
- return
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+ try {
+ return preExecutionAuthenticator.execute(() -> {
+ List<TableIdentifier> tableIdentifiers =
catalog.listTables(getNamespace(dbName));
+ return
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to list table names, error
message is:" + e.getMessage(), e);
+ }
}
@Override
@@ -310,7 +324,11 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
@Override
public Table loadTable(String dbName, String tblName) {
- return catalog.loadTable(getTableIdentifier(dbName, tblName));
+ try {
+ return preExecutionAuthenticator.execute(() ->
catalog.loadTable(getTableIdentifier(dbName, tblName)));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to load table, error message
is:" + e.getMessage(), e);
+ }
}
private TableIdentifier getTableIdentifier(String dbName, String tblName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 5ef64153418..75b9b9e1812 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -51,10 +51,13 @@ import
org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -63,10 +66,13 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.Or;
+import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.Unbound;
import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.LocationUtil;
@@ -695,4 +701,33 @@ public class IcebergUtils {
hiveCatalog.initialize(name, catalogProperties);
return hiveCatalog;
}
+
+ // Retrieve the manifest files that match the query based on partitions in
filter
+ public static CloseableIterable<ManifestFile> getMatchingManifest(
+ List<ManifestFile> dataManifests,
+ Map<Integer, PartitionSpec> specsById,
+ Expression dataFilter) {
+ LoadingCache<Integer, ManifestEvaluator> evalCache =
Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = specsById.get(specId);
+ return ManifestEvaluator.forPartitionFilter(
+ Expressions.and(
+ Expressions.alwaysTrue(),
+ Projections.inclusive(spec,
true).project(dataFilter)),
+ spec,
+ true);
+ });
+
+ CloseableIterable<ManifestFile> matchingManifests =
CloseableIterable.filter(
+ CloseableIterable.withNoopClose(dataManifests),
+ manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+ matchingManifests =
+ CloseableIterable.filter(
+ matchingManifests,
+ manifest -> manifest.hasAddedFiles() ||
manifest.hasExistingFiles());
+
+ return matchingManifests;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 215a94f110f..9551ea20388 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -22,9 +22,11 @@ import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalTable;
@@ -48,14 +50,15 @@ import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TTableFormatFileDesc;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
@@ -63,6 +66,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
@@ -70,13 +74,15 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
+import java.time.DateTimeException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
public class IcebergScanNode extends FileQueryScanNode {
@@ -93,7 +99,20 @@ public class IcebergScanNode extends FileQueryScanNode {
// But for part of splits which have no position/equality delete files, we
can still do count push down opt.
// And for split level count push down opt, the flag is set in each split.
private boolean tableLevelPushDownCount = false;
+ private long countFromSnapshot;
private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
+ private long targetSplitSize;
+ private ConcurrentHashMap.KeySetView<Object, Boolean> partitionPathSet;
+ private boolean isPartitionedTable;
+ private int formatVersion;
+ private PreExecutionAuthenticator preExecutionAuthenticator;
+ private TableScan icebergTableScan;
+
+ // for test
+ @VisibleForTesting
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc,
SessionVariable sv) {
+ super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, false, sv);
+ }
/**
* External file scan node for Query iceberg table
@@ -129,6 +148,11 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
protected void doInitialize() throws UserException {
icebergTable = source.getIcebergTable();
+ targetSplitSize = getRealFileSplitSize(0);
+ partitionPathSet = ConcurrentHashMap.newKeySet();
+ isPartitionedTable = icebergTable.spec().isPartitioned();
+ formatVersion = ((BaseTable)
icebergTable).operations().current().formatVersion();
+ preExecutionAuthenticator =
source.getCatalog().getPreExecutionAuthenticator();
super.doInitialize();
}
@@ -143,7 +167,6 @@ public class IcebergScanNode extends FileQueryScanNode {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
- int formatVersion = icebergSplit.getFormatVersion();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
if (tableLevelPushDownCount) {
@@ -185,7 +208,7 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public List<Split> getSplits(int numBackends) throws UserException {
try {
- return
source.getCatalog().getPreExecutionAuthenticator().execute(() ->
doGetSplits(numBackends));
+ return preExecutionAuthenticator.execute(() ->
doGetSplits(numBackends));
} catch (Exception e) {
Optional<NotSupportedException> opt =
checkNotSupportedException(e);
if (opt.isPresent()) {
@@ -194,10 +217,66 @@ public class IcebergScanNode extends FileQueryScanNode {
throw new
RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
+ }
+ @Override
+ public void startSplit(int numBackends) throws UserException {
+ try {
+ preExecutionAuthenticator.execute(() -> {
+ doStartSplit();
+ return null;
+ });
+ } catch (Exception e) {
+ throw new UserException(e.getMessage(), e);
+ }
}
- private List<Split> doGetSplits(int numBackends) throws UserException {
+ public void doStartSplit() {
+ TableScan scan = createTableScan();
+ CompletableFuture.runAsync(() -> {
+ AtomicReference<CloseableIterable<FileScanTask>> taskRef = new
AtomicReference<>();
+ try {
+ preExecutionAuthenticator.execute(
+ () -> {
+ CloseableIterable<FileScanTask> fileScanTasks =
planFileScanTask(scan);
+ taskRef.set(fileScanTasks);
+
+ CloseableIterator<FileScanTask> iterator =
fileScanTasks.iterator();
+ while (splitAssignment.needMoreSplit() &&
iterator.hasNext()) {
+ try {
+
splitAssignment.addToQueue(Lists.newArrayList(createIcebergSplit(iterator.next())));
+ } catch (UserException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ );
+ splitAssignment.finishSchedule();
+ } catch (Exception e) {
+ Optional<NotSupportedException> opt =
checkNotSupportedException(e);
+ if (opt.isPresent()) {
+ splitAssignment.setException(new
UserException(opt.get().getMessage(), opt.get()));
+ } else {
+ splitAssignment.setException(new
UserException(e.getMessage(), e));
+ }
+ } finally {
+ if (taskRef.get() != null) {
+ try {
+ taskRef.get().close();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+ }
+ }, Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor());
+ }
+
+ @VisibleForTesting
+ public TableScan createTableScan() {
+ if (icebergTableScan != null) {
+ return icebergTableScan;
+ }
+
TableScan scan = icebergTable.newScan();
// set snapshot
@@ -219,16 +298,16 @@ public class IcebergScanNode extends FileQueryScanNode {
this.pushdownIcebergPredicates.add(predicate.toString());
}
- // get splits
- List<Split> splits = new ArrayList<>();
- int formatVersion = ((BaseTable)
icebergTable).operations().current().formatVersion();
- HashSet<String> partitionPathSet = new HashSet<>();
- boolean isPartitionedTable = icebergTable.spec().isPartitioned();
+ icebergTableScan =
scan.planWith(source.getCatalog().getThreadPoolWithPreAuth());
- long realFileSplitSize = getRealFileSplitSize(0);
- CloseableIterable<FileScanTask> fileScanTasks = null;
+ return icebergTableScan;
+ }
+
+ private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+ long targetSplitSize = getRealFileSplitSize(0);
+ CloseableIterable<FileScanTask> splitFiles;
try {
- fileScanTasks = TableScanUtil.splitFiles(scan.planFiles(),
realFileSplitSize);
+ splitFiles = TableScanUtil.splitFiles(scan.planFiles(),
targetSplitSize);
} catch (NullPointerException e) {
/*
Caused by: java.lang.NullPointerException: Type cannot be null
@@ -259,74 +338,122 @@ public class IcebergScanNode extends FileQueryScanNode {
LOG.warn("Iceberg TableScanUtil.splitFiles throw
NullPointerException. Cause : ", e);
throw new NotSupportedException("Unable to read Iceberg table with
dropped old partition column.");
}
- try (CloseableIterable<CombinedScanTask> combinedScanTasks =
- TableScanUtil.planTasks(fileScanTasks, realFileSplitSize,
1, 0)) {
- combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
- if (isPartitionedTable) {
- StructLike structLike = splitTask.file().partition();
- // Counts the number of partitions read
- partitionPathSet.add(structLike.toString());
- }
- String originalPath = splitTask.file().path().toString();
- LocationPath locationPath = new LocationPath(originalPath,
source.getCatalog().getProperties());
- IcebergSplit split = new IcebergSplit(
- locationPath,
- splitTask.start(),
- splitTask.length(),
- splitTask.file().fileSizeInBytes(),
- new String[0],
- formatVersion,
- source.getCatalog().getProperties(),
- new ArrayList<>(),
- originalPath);
- split.setTargetSplitSize(realFileSplitSize);
- if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
-
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
+ return splitFiles;
+ }
+
+ private Split createIcebergSplit(FileScanTask fileScanTask) {
+ if (isPartitionedTable) {
+ StructLike structLike = fileScanTask.file().partition();
+ // Counts the number of partitions read
+ partitionPathSet.add(structLike.toString());
+ }
+ String originalPath = fileScanTask.file().path().toString();
+ LocationPath locationPath = new LocationPath(originalPath,
source.getCatalog().getProperties());
+ IcebergSplit split = new IcebergSplit(
+ locationPath,
+ fileScanTask.start(),
+ fileScanTask.length(),
+ fileScanTask.file().fileSizeInBytes(),
+ new String[0],
+ formatVersion,
+ source.getCatalog().getProperties(),
+ new ArrayList<>(),
+ originalPath);
+ if (!fileScanTask.deletes().isEmpty()) {
+ split.setDeleteFileFilters(getDeleteFileFilters(fileScanTask));
+ }
+ split.setTableFormatType(TableFormatType.ICEBERG);
+ split.setTargetSplitSize(targetSplitSize);
+ return split;
+ }
+
+ private List<Split> doGetSplits(int numBackends) throws UserException {
+
+ TableScan scan = createTableScan();
+ List<Split> splits = new ArrayList<>();
+
+ try (CloseableIterable<FileScanTask> fileScanTasks =
planFileScanTask(scan)) {
+ if (tableLevelPushDownCount) {
+ int needSplitCnt = countFromSnapshot <
COUNT_WITH_PARALLEL_SPLITS
+ ? 1 : sessionVariable.getParallelExecInstanceNum() *
numBackends;
+ for (FileScanTask next : fileScanTasks) {
+ splits.add(createIcebergSplit(next));
+ if (splits.size() >= needSplitCnt) {
+ break;
+ }
}
- split.setTableFormatType(TableFormatType.ICEBERG);
- splits.add(split);
- }));
+ assignCountToSplits(splits, countFromSnapshot);
+ return splits;
+ } else {
+ fileScanTasks.forEach(taskGrp ->
splits.add(createIcebergSplit(taskGrp)));
+ }
} catch (IOException e) {
throw new UserException(e.getMessage(), e.getCause());
}
+ selectedPartitionNum = partitionPathSet.size();
+ return splits;
+ }
+
+ @Override
+ public boolean isBatchMode() {
TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT)) {
- // we can create a special empty split and skip the plan process
- if (splits.isEmpty()) {
- return splits;
- }
- long countFromSnapshot = getCountFromSnapshot();
+ countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot >= 0) {
tableLevelPushDownCount = true;
- List<Split> pushDownCountSplits;
- if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
- int minSplits =
sessionVariable.getParallelExecInstanceNum() * numBackends;
- pushDownCountSplits = splits.subList(0,
Math.min(splits.size(), minSplits));
- } else {
- pushDownCountSplits =
Collections.singletonList(splits.get(0));
+ return false;
+ }
+ }
+
+ if (createTableScan().snapshot() == null) {
+ return false;
+ }
+
+ if (!sessionVariable.getEnableExternalTableBatchMode()) {
+ return false;
+ }
+
+ try {
+ return preExecutionAuthenticator.execute(() -> {
+ try (CloseableIterator<ManifestFile> matchingManifest =
+ IcebergUtils.getMatchingManifest(
+
createTableScan().snapshot().dataManifests(icebergTable.io()),
+ icebergTable.specs(),
+ createTableScan().filter()).iterator()) {
+ int cnt = 0;
+ while (matchingManifest.hasNext()) {
+ ManifestFile next = matchingManifest.next();
+ cnt += next.addedFilesCount() +
next.existingFilesCount();
+ if (cnt >= sessionVariable.getNumFilesInBatchMode()) {
+ return true;
+ }
+ }
}
- assignCountToSplits(pushDownCountSplits, countFromSnapshot);
- return pushDownCountSplits;
+ return false;
+ });
+ } catch (Exception e) {
+ Optional<NotSupportedException> opt =
checkNotSupportedException(e);
+ if (opt.isPresent()) {
+ throw opt.get();
+ } else {
+ throw new
RuntimeException(ExceptionUtils.getRootCauseMessage(e), e);
}
}
- selectedPartitionNum = partitionPathSet.size();
- return splits;
}
- public Long getSpecifiedSnapshot() throws UserException {
+ public Long getSpecifiedSnapshot() {
TableSnapshot tableSnapshot = getQueryTableSnapshot();
if (tableSnapshot != null) {
TableSnapshot.VersionType type = tableSnapshot.getType();
- try {
- if (type == TableSnapshot.VersionType.VERSION) {
- return tableSnapshot.getVersion();
- } else {
- long timestamp =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
- return SnapshotUtil.snapshotIdAsOfTime(icebergTable,
timestamp);
+ if (type == TableSnapshot.VersionType.VERSION) {
+ return tableSnapshot.getVersion();
+ } else {
+ long timestamp =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+ if (timestamp < 0) {
+ throw new DateTimeException("can't parse time: " +
tableSnapshot.getTime());
}
- } catch (IllegalArgumentException e) {
- throw new UserException(e);
+ return SnapshotUtil.snapshotIdAsOfTime(icebergTable,
timestamp);
}
}
return null;
@@ -408,13 +535,9 @@ public class IcebergScanNode extends FileQueryScanNode {
return !col.isAllowNull();
}
- private long getCountFromSnapshot() {
- Long specifiedSnapshot;
- try {
- specifiedSnapshot = getSpecifiedSnapshot();
- } catch (UserException e) {
- return -1;
- }
+ @VisibleForTesting
+ public long getCountFromSnapshot() {
+ Long specifiedSnapshot = getSpecifiedSnapshot();
Snapshot snapshot = specifiedSnapshot == null
? icebergTable.currentSnapshot() :
icebergTable.snapshot(specifiedSnapshot);
@@ -467,6 +590,11 @@ public class IcebergScanNode extends FileQueryScanNode {
((IcebergSplit) splits.get(size -
1)).setTableLevelRowCount(countPerSplit + totalCount % size);
}
+ @Override
+ public int numApproximateSplits() {
+ return NUM_SPLITS_PER_PARTITION * partitionPathSet.size() > 0 ?
partitionPathSet.size() : 1;
+ }
+
private Optional<NotSupportedException>
checkNotSupportedException(Exception e) {
if (e instanceof NullPointerException) {
/*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index 0520612935a..e31ec5c3fad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -22,6 +22,7 @@ import org.apache.doris.datasource.FileSplit;
import lombok.Data;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -35,7 +36,7 @@ public class IcebergSplit extends FileSplit {
// but the original datafile path must be used.
private final String originalPath;
private Integer formatVersion;
- private List<IcebergDeleteFileFilter> deleteFileFilters;
+ private List<IcebergDeleteFileFilter> deleteFileFilters = new
ArrayList<>();
private Map<String, String> config;
// tableLevelRowCount will be set only table-level count push down opt is
available.
private long tableLevelRowCount = -1;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 063aeea68eb..632d88428fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -267,8 +267,10 @@ public class MaxComputeScanNode extends FileQueryScanNode {
createTableBatchReadSession(requiredBatchPartitionSpecs);
List<Split> batchSplit =
getSplitByTableSession(tableBatchReadSession);
- splitAssignment.addToQueue(batchSplit);
- } catch (IOException e) {
+ if (splitAssignment.needMoreSplit()) {
+ splitAssignment.addToQueue(batchSplit);
+ }
+ } catch (Exception e) {
batchException.set(new
UserException(e.getMessage(), e));
} finally {
if (batchException.get() != null) {
@@ -288,7 +290,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
splitAssignment.setException(batchException.get());
}
}
- });
+ }, scheduleExecutor);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index b4033a0535e..e68ab2476d9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -101,7 +101,7 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
protected SplitAssignment splitAssignment = null;
protected long selectedPartitionNum = 0;
- protected long selectedSplitNum = 0;
+ protected int selectedSplitNum = 0;
// create a mapping between output slot's id and project expr
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index bb6f13618fa..1bbb5e51d29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -464,6 +464,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String NUM_PARTITIONS_IN_BATCH_MODE =
"num_partitions_in_batch_mode";
+ public static final String NUM_FILES_IN_BATCH_MODE =
"num_files_in_batch_mode";
+
public static final String FETCH_SPLITS_MAX_WAIT_TIME =
"fetch_splits_max_wait_time_ms";
/**
@@ -1779,12 +1781,19 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true)
public int numPartitionsInBatchMode = 1024;
+ @VariableMgr.VarAttr(
+ name = NUM_FILES_IN_BATCH_MODE,
+ description = {"如果文件数量超过阈值,BE将通过batch方式获取scan ranges",
+ "If the number of files exceeds the threshold, scan ranges
will be got through batch mode."},
+ needForward = true)
+ public int numFilesInBatchMode = 1024;
+
@VariableMgr.VarAttr(
name = FETCH_SPLITS_MAX_WAIT_TIME,
description = {"batch方式中BE获取splits的最大等待时间",
"The max wait time of getting splits in batch mode."},
needForward = true)
- public long fetchSplitsMaxWaitTime = 4000;
+ public long fetchSplitsMaxWaitTime = 1000;
@VariableMgr.VarAttr(
name = ENABLE_PARQUET_LAZY_MAT,
@@ -2165,6 +2174,13 @@ public class SessionVariable implements Serializable,
Writable {
public static final String IGNORE_RUNTIME_FILTER_IDS =
"ignore_runtime_filter_ids";
+ public static final String ENABLE_EXTERNAL_TABLE_BATCH_MODE =
"enable_external_table_batch_mode";
+ @VariableMgr.VarAttr(
+ name = ENABLE_EXTERNAL_TABLE_BATCH_MODE,
+ description = {"使能外表的batch mode功能", "Enable the batch mode
function of the external table."},
+ needForward = true)
+ public boolean enableExternalTableBatchMode = true;
+
public Set<Integer> getIgnoredRuntimeFilterIds() {
Set<Integer> ids = Sets.newLinkedHashSet();
if (ignoreRuntimeFilterIds.isEmpty()) {
@@ -3437,6 +3453,10 @@ public class SessionVariable implements Serializable,
Writable {
this.numPartitionsInBatchMode = numPartitionsInBatchMode;
}
+ public int getNumFilesInBatchMode() {
+ return numFilesInBatchMode;
+ }
+
public long getFetchSplitsMaxWaitTime() {
return fetchSplitsMaxWaitTime;
}
@@ -4789,5 +4809,9 @@ public class SessionVariable implements Serializable,
Writable {
public boolean showSplitProfileInfo() {
return enableProfile() && getProfileLevel() > 1;
}
+
+ public boolean getEnableExternalTableBatchMode() {
+ return enableExternalTableBatchMode;
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
new file mode 100644
index 00000000000..ab5205b47a7
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/SplitAssignmentTest.java
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.spi.Split;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class SplitAssignmentTest {
+
+ @Injectable
+ private FederationBackendPolicy mockBackendPolicy;
+
+ @Injectable
+ private SplitGenerator mockSplitGenerator;
+
+ @Injectable
+ private SplitToScanRange mockSplitToScanRange;
+
+ @Mocked
+ private Split mockSplit;
+
+ @Mocked
+ private Backend mockBackend;
+
+ @Mocked
+ private TScanRangeLocations mockScanRangeLocations;
+
+ private SplitAssignment splitAssignment;
+ private Map<String, String> locationProperties;
+ private List<String> pathPartitionKeys;
+
+ @BeforeEach
+ void setUp() {
+ locationProperties = new HashMap<>();
+ pathPartitionKeys = new ArrayList<>();
+
+ splitAssignment = new SplitAssignment(
+ mockBackendPolicy,
+ mockSplitGenerator,
+ mockSplitToScanRange,
+ locationProperties,
+ pathPartitionKeys
+ );
+ }
+
+ // ==================== init() method tests ====================
+
+ @Test
+ void testInitSuccess() throws Exception {
+ Multimap<Backend, Split> batch = ArrayListMultimap.create();
+ batch.put(mockBackend, mockSplit);
+
+ new Expectations() {
+ {
+ mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
+ result = batch;
+
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ result = mockScanRangeLocations;
+ }
+ };
+
+ // Start a thread to simulate split generation after a short delay
+ Thread splitGeneratorThread = new Thread(() -> {
+ try {
+ Thread.sleep(50); // Short delay to simulate async split
generation
+ List<Split> splits = Collections.singletonList(mockSplit);
+ splitAssignment.addToQueue(splits);
+ } catch (Exception e) {
+ // Ignore for test
+ }
+ });
+
+ splitGeneratorThread.start();
+
+ // Test
+ Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+
+ // Verify sample split is set
+ Assertions.assertNotNull(splitAssignment.getSampleSplit());
+
+ splitGeneratorThread.join(1000); // Wait for thread to complete
+ }
+
+ @Test
+ void testInitTimeout() throws Exception {
+ // Use MockUp to simulate timeout behavior quickly instead of waiting
30 seconds
+ SplitAssignment testAssignment = new SplitAssignment(
+ mockBackendPolicy,
+ mockSplitGenerator,
+ mockSplitToScanRange,
+ locationProperties,
+ pathPartitionKeys
+ );
+
+ new MockUp<SplitAssignment>() {
+ @mockit.Mock
+ public void init() throws UserException {
+ // Directly throw timeout exception to simulate the timeout
scenario quickly
+ throw new UserException("Failed to get first split after
waiting for 0 seconds.");
+ }
+ };
+
+ // Test & Verify - should timeout immediately now
+ UserException exception = Assertions.assertThrows(UserException.class,
() -> testAssignment.init());
+ Assertions.assertTrue(exception.getMessage().contains("Failed to get
first split after waiting for"));
+ }
+
+ @Test
+ void testInitInterrupted() throws Exception {
+ CountDownLatch initStarted = new CountDownLatch(1);
+ CountDownLatch shouldInterrupt = new CountDownLatch(1);
+
+ Thread initThread = new Thread(() -> {
+ try {
+ initStarted.countDown();
+ shouldInterrupt.await();
+ splitAssignment.init();
+ } catch (Exception e) {
+ // Expected interruption
+ }
+ });
+
+ initThread.start();
+ initStarted.await();
+
+ // Interrupt the init thread
+ initThread.interrupt();
+ shouldInterrupt.countDown();
+
+ initThread.join(1000);
+ }
+
+ @Test
+ void testInitWithPreExistingException() throws Exception {
+ UserException preException = new UserException("Pre-existing error");
+ splitAssignment.setException(preException);
+
+ // Test & Verify
+ UserException exception = Assertions.assertThrows(UserException.class,
() -> splitAssignment.init());
+ Assertions.assertTrue(exception.getMessage().contains(" Pre-existing
error"), exception.getMessage());
+ }
+
+ // ==================== addToQueue() method tests ====================
+
+ @Test
+ void testAddToQueueWithEmptyList() throws Exception {
+ // Test
+ Assertions.assertDoesNotThrow(() ->
splitAssignment.addToQueue(Collections.emptyList()));
+ }
+
+ @Test
+ void testAddToQueueSuccess() throws Exception {
+ Multimap<Backend, Split> batch = ArrayListMultimap.create();
+ batch.put(mockBackend, mockSplit);
+
+ new Expectations() {
+ {
+ mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
+ result = batch;
+
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ result = mockScanRangeLocations;
+ }
+ };
+
+ // Mock setup
+ List<Split> splits = Collections.singletonList(mockSplit);
+
+ // Test
+ Assertions.assertDoesNotThrow(() ->
splitAssignment.addToQueue(splits));
+
+ // Verify sample split is set
+ Assertions.assertEquals(mockSplit, splitAssignment.getSampleSplit());
+
+ // Verify assignment queue is created and contains data
+ BlockingQueue<Collection<TScanRangeLocations>> queue =
splitAssignment.getAssignedSplits(mockBackend);
+ Assertions.assertNotNull(queue);
+ }
+
+ @Test
+ void testAddToQueueSampleSplitAlreadySet() throws Exception {
+ Multimap<Backend, Split> batch = ArrayListMultimap.create();
+ batch.put(mockBackend, mockSplit);
+
+ new Expectations() {
+ {
+ mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
+ result = batch;
+ minTimes = 0;
+
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ result = mockScanRangeLocations;
+ minTimes = 0;
+ }
+ };
+
+ // Setup: First call to set sample split
+ List<Split> firstSplits = Collections.singletonList(mockSplit);
+
+ splitAssignment.addToQueue(firstSplits);
+ Split firstSampleSplit = splitAssignment.getSampleSplit();
+
+ // Test: Second call should not change sample split
+ List<Split> secondSplits = Collections.singletonList(mockSplit);
+
+ splitAssignment.addToQueue(secondSplits);
+
+ // Verify sample split unchanged
+ Assertions.assertEquals(firstSampleSplit,
splitAssignment.getSampleSplit());
+ }
+
+ @Test
+ void testAddToQueueWithQueueBlockingScenario() throws Exception {
+ Multimap<Backend, Split> batch = ArrayListMultimap.create();
+ batch.put(mockBackend, mockSplit);
+
+ new Expectations() {
+ {
+ mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
+ result = batch;
+
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ result = mockScanRangeLocations;
+ }
+ };
+
+ // This test simulates a scenario where appendBatch might experience
queue blocking
+ // by adding multiple batches rapidly
+
+ List<Split> splits = Collections.singletonList(mockSplit);
+
+ // First, fill up the queue by adding many batches
+ for (int i = 0; i < 10; i++) {
+ splitAssignment.addToQueue(splits);
+ }
+
+ // Verify the queue has data
+ BlockingQueue<Collection<TScanRangeLocations>> queue =
splitAssignment.getAssignedSplits(mockBackend);
+ Assertions.assertNotNull(queue);
+ }
+
+ @Test
+ void testAddToQueueConcurrentAccess() throws Exception {
+ Multimap<Backend, Split> batch = ArrayListMultimap.create();
+ batch.put(mockBackend, mockSplit);
+
+ new Expectations() {
+ {
+ mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
+ result = batch;
+
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ result = mockScanRangeLocations;
+ }
+ };
+
+ // Test concurrent access to addToQueue method
+ List<Split> splits = Collections.singletonList(mockSplit);
+
+ int threadCount = 5;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch doneLatch = new CountDownLatch(threadCount);
+
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < threadCount; i++) {
+ Thread thread = new Thread(() -> {
+ try {
+ startLatch.await();
+ splitAssignment.addToQueue(splits);
+ } catch (Exception e) {
+ // Log but don't fail test for concurrency issues
+ } finally {
+ doneLatch.countDown();
+ }
+ });
+ threads.add(thread);
+ thread.start();
+ }
+
+ startLatch.countDown(); // Start all threads
+ Assertions.assertTrue(doneLatch.await(5, TimeUnit.SECONDS)); // Wait
for completion
+
+ // Verify sample split is set
+ Assertions.assertNotNull(splitAssignment.getSampleSplit());
+
+ // Cleanup
+ for (Thread thread : threads) {
+ thread.join(1000);
+ }
+ }
+
+ // ==================== Integration tests for init() and addToQueue()
====================
+
+ @Test
+ void testInitAndAddToQueueIntegration() throws Exception {
+ new Expectations() {
+ {
+ Multimap<Backend, Split> batch = ArrayListMultimap.create();
+ batch.put(mockBackend, mockSplit);
+
+ mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
+ result = batch;
+
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ result = mockScanRangeLocations;
+ }
+ };
+
+ List<Split> splits = Collections.singletonList(mockSplit);
+
+ // Start background thread to add splits after init starts
+ Thread splitsProvider = new Thread(() -> {
+ try {
+ Thread.sleep(100); // Small delay to ensure init is waiting
+ splitAssignment.addToQueue(splits);
+ } catch (Exception e) {
+ // Ignore
+ }
+ });
+
+ splitsProvider.start();
+
+ // Test init - should succeed once splits are added
+ Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+
+ // Verify
+ Assertions.assertNotNull(splitAssignment.getSampleSplit());
+ Assertions.assertEquals(mockSplit, splitAssignment.getSampleSplit());
+
+ BlockingQueue<Collection<TScanRangeLocations>> queue =
splitAssignment.getAssignedSplits(mockBackend);
+ Assertions.assertNotNull(queue);
+
+ splitsProvider.join(1000);
+ }
+
+ // ==================== appendBatch() behavior tests ====================
+
+ @Test
+ void testAppendBatchTimeoutBehavior() throws Exception {
+ new Expectations() {
+ {
+ Multimap<Backend, Split> batch = ArrayListMultimap.create();
+ batch.put(mockBackend, mockSplit);
+
+ mockBackendPolicy.computeScanRangeAssignment((List<Split>)
any);
+ result = batch;
+
+ mockSplitToScanRange.getScanRange(mockBackend,
locationProperties, mockSplit, pathPartitionKeys);
+ result = mockScanRangeLocations;
+ }
+ };
+
+ // This test verifies that appendBatch properly handles queue offer
timeouts
+ // We'll simulate this by first filling the assignment and then trying
to add more
+
+ List<Split> splits = Collections.singletonList(mockSplit);
+
+ // Add multiple splits to potentially cause queue pressure
+ for (int i = 0; i < 50; i++) {
+ try {
+ splitAssignment.addToQueue(splits);
+ } catch (Exception e) {
+ // Expected if queue gets full and times out
+ break;
+ }
+ }
+
+ // Verify that splits were processed
+ Assertions.assertNotNull(splitAssignment.getSampleSplit());
+ }
+
+ @Test
+ void testInitWhenNeedMoreSplitReturnsFalse() throws Exception {
+ // Test init behavior when needMoreSplit() returns false
+ splitAssignment.stop(); // This should make needMoreSplit() return
false
+
+ // Init should complete immediately without waiting
+ Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+ }
+
+ @Test
+ void testInitWithScheduleFinished() throws Exception {
+ // Test init behavior when schedule is already finished
+ splitAssignment.finishSchedule();
+
+ // Init should complete immediately without waiting
+ Assertions.assertDoesNotThrow(() -> splitAssignment.init());
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
index 244f173acb7..b76f24667ad 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergUtilsTest.java
@@ -17,12 +17,33 @@
package org.apache.doris.datasource.iceberg;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.GenericPartitionFieldSummary;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFile.PartitionFieldSummary;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.StructType;
import org.junit.Assert;
import org.junit.Test;
import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class IcebergUtilsTest {
@Test
@@ -38,7 +59,7 @@ public class IcebergUtilsTest {
new HashMap<String, String>() {{
put("list-all-tables", "true");
}},
- "");
+ "");
HiveCatalog i2 = IcebergUtils.createIcebergHiveCatalog(c2, "i1");
Assert.assertTrue(getListAllTables(i2));
@@ -60,4 +81,126 @@ public class IcebergUtilsTest {
declaredField.setAccessible(true);
return declaredField.getBoolean(hiveCatalog);
}
+
+ @Test
+ public void testGetMatchingManifest() {
+
+ // partition : 100 - 200
+ GenericManifestFile f1 =
getGenericManifestFileForDataTypeWithPartitionSummary(
+ "manifest_f1.avro",
+ Collections.singletonList(new GenericPartitionFieldSummary(
+ false, false, getByteBufferForLong(100),
getByteBufferForLong(200))));
+
+ // partition : 300 - 400
+ GenericManifestFile f2 =
getGenericManifestFileForDataTypeWithPartitionSummary(
+ "manifest_f2.avro",
+ Collections.singletonList(new GenericPartitionFieldSummary(
+ false, false, getByteBufferForLong(300),
getByteBufferForLong(400))));
+
+ // partition : 500 - 600
+ GenericManifestFile f3 =
getGenericManifestFileForDataTypeWithPartitionSummary(
+ "manifest_f3.avro",
+ Collections.singletonList(new GenericPartitionFieldSummary(
+ false, false, getByteBufferForLong(500),
getByteBufferForLong(600))));
+
+ List<ManifestFile> manifestFiles = new ArrayList<ManifestFile>() {{
+ add(f1);
+ add(f2);
+ add(f3);
+ }};
+
+ Schema schema = new Schema(
+ StructType.of(
+ Types.NestedField.required(1, "id", LongType.get()),
+ Types.NestedField.required(2, "data", LongType.get()),
+ Types.NestedField.required(3, "par", LongType.get()))
+ .fields());
+
+ // test empty partition spec
+ HashMap<Integer, PartitionSpec> emptyPartitionSpecsById = new
HashMap<Integer, PartitionSpec>() {{
+ put(0, PartitionSpec.builderFor(schema).build());
+ }};
+ assertManifest(manifestFiles, emptyPartitionSpecsById,
Expressions.alwaysTrue(), manifestFiles);
+
+ // test long partition spec
+ HashMap<Integer, PartitionSpec> longPartitionSpecsById = new
HashMap<Integer, PartitionSpec>() {{
+ put(0,
PartitionSpec.builderFor(schema).identity("par").build());
+ }};
+ // 1. par > 10
+ UnboundPredicate<Long> e1 = Expressions.greaterThan("par", 10L);
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(Expressions.alwaysTrue(), e1), manifestFiles);
+
+ // 2. 10 < par < 90
+ UnboundPredicate<Long> e2 = Expressions.greaterThan("par", 90L);
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(e1, e2), manifestFiles);
+
+ // 3. 10 < par < 300
+ UnboundPredicate<Long> e3 = Expressions.lessThan("par", 300L);
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(e1, e3), Collections.singletonList(f1));
+
+ // 4. 10 < par < 400
+ UnboundPredicate<Long> e4 = Expressions.lessThan("par", 400L);
+ ArrayList<ManifestFile> expect1 = new ArrayList<ManifestFile>() {{
+ add(f1);
+ add(f2);
+ }};
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(e1, e4), expect1);
+
+ // 5. 10 < par < 501
+ UnboundPredicate<Long> e5 = Expressions.lessThan("par", 501L);
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(e1, e5), manifestFiles);
+
+ // 6. 200 < par < 501
+ UnboundPredicate<Long> e6 = Expressions.greaterThan("par", 200L);
+ ArrayList<ManifestFile> expect2 = new ArrayList<ManifestFile>() {{
+ add(f2);
+ add(f3);
+ }};
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(e6, e5), expect2);
+
+ // 7. par > 600
+ UnboundPredicate<Long> e7 = Expressions.greaterThan("par", 600L);
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(Expressions.alwaysTrue(), e7), Collections.emptyList());
+
+ // 8. par < 100
+ UnboundPredicate<Long> e8 = Expressions.lessThan("par", 100L);
+ assertManifest(manifestFiles, longPartitionSpecsById,
Expressions.and(Expressions.alwaysTrue(), e8), Collections.emptyList());
+ }
+
+ private void assertManifest(List<ManifestFile> dataManifests,
+ Map<Integer, PartitionSpec> specsById,
+ Expression dataFilter,
+ List<ManifestFile> expected) {
+ CloseableIterable<ManifestFile> matchingManifest =
+ IcebergUtils.getMatchingManifest(dataManifests, specsById,
dataFilter);
+ List<ManifestFile> ret = new ArrayList<>();
+ matchingManifest.forEach(ret::add);
+ ret.sort(Comparator.comparing(ManifestFile::path));
+ Assert.assertEquals(expected, ret);
+ }
+
+ private ByteBuffer getByteBufferForLong(long num) {
+ return Conversions.toByteBuffer(Types.LongType.get(), num);
+ }
+
+ private GenericManifestFile
getGenericManifestFileForDataTypeWithPartitionSummary(
+ String path,
+ List<PartitionFieldSummary> partitionFieldSummaries) {
+ return new GenericManifestFile(
+ path,
+ 1024L,
+ 0,
+ ManifestContent.DATA,
+ 1,
+ 1,
+ 123456789L,
+ 2,
+ 100,
+ 0,
+ 0,
+ 0,
+ 0,
+ partitionFieldSummaries,
+ null);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
new file mode 100644
index 00000000000..8ae51a61f46
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.thrift.TPushAggOp;
+
+import com.google.common.collect.Lists;
+import mockit.Expectations;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.GenericManifestFile;
+import org.apache.iceberg.ManifestContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopTableOperations;
+import org.apache.iceberg.io.CloseableIterable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class IcebergScanNodeTest {
+
+ @Mocked
+ HadoopTableOperations hadoopTableOperations;
+ @Mocked
+ Snapshot snapshot;
+
+ @Test
+ public void testIsBatchMode() {
+ SessionVariable sessionVariable = new SessionVariable();
+ IcebergScanNode icebergScanNode = new IcebergScanNode(new
PlanNodeId(1), new TupleDescriptor(new TupleId(1)), sessionVariable);
+
+ new Expectations(icebergScanNode) {{
+ icebergScanNode.getPushDownAggNoGroupingOp();
+ result = TPushAggOp.COUNT;
+ icebergScanNode.getCountFromSnapshot();
+ result = 1L;
+ }};
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ BaseTable mockTable = new BaseTable(hadoopTableOperations,
"mockTable");
+ new Expectations(icebergScanNode) {{
+ icebergScanNode.getPushDownAggNoGroupingOp();
+ result = TPushAggOp.NONE;
+ Deencapsulation.setField(icebergScanNode, "icebergTable",
mockTable);
+ }};
+ TableScan tableScan = mockTable.newScan();
+ new Expectations(mockTable) {{
+ mockTable.currentSnapshot();
+ result = null;
+ icebergScanNode.createTableScan();
+ result = tableScan;
+ }};
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ new Expectations(mockTable) {{
+ mockTable.currentSnapshot();
+ result = snapshot;
+ }};
+ new Expectations(sessionVariable) {{
+ sessionVariable.getEnableExternalTableBatchMode();
+ result = false;
+ }};
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+
+ new Expectations(sessionVariable) {{
+ sessionVariable.getEnableExternalTableBatchMode();
+ result = true;
+ }};
+ new Expectations(icebergScanNode) {{
+ Deencapsulation.setField(icebergScanNode,
"preExecutionAuthenticator", new PreExecutionAuthenticator());
+ }};
+ new Expectations() {{
+ sessionVariable.getNumFilesInBatchMode();
+ result = 1024;
+ }};
+
+ mockManifestFile("p", 10, 0);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 10, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 1024, 0);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 1024);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ new Expectations() {{
+ sessionVariable.getNumFilesInBatchMode();
+ result = 100;
+ }};
+
+ mockManifestFile("p", 10, 0);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 10, 10);
+ Assert.assertFalse(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 0, 100);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 100, 0);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+
+ mockManifestFile("p", 10, 90);
+ Assert.assertTrue(icebergScanNode.isBatchMode());
+ }
+
+ private void mockManifestFile(String path, int addedFileCount, int
existingFileCount) {
+ new MockUp<IcebergUtils>() {
+ @Mock
+ CloseableIterable<ManifestFile>
getMatchingManifest(List<ManifestFile> dataManifests,
+ Map<Integer,
PartitionSpec> specsById,
+ Expression
dataFilte) {
+ return CloseableIterable.withNoopClose(new
ArrayList<ManifestFile>() {{
+ add(genManifestFile(path, addedFileCount,
existingFileCount));
+ }}
+ );
+ }
+ };
+ }
+
+ private ManifestFile genManifestFile(String path, int addedFileCount, int
existingFileCount) {
+ return new GenericManifestFile(
+ path,
+ 10, // length
+ 1, // specId
+ ManifestContent.DATA,
+ 1, // sequenceNumber
+ 1, // minSeqNumber
+ 1L, // snapshotid
+ addedFileCount,
+ 1,
+ existingFileCount,
+ 1,
+ 0, // deleteFilesCount
+ 0,
+ Lists.newArrayList(),
+ null
+ );
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
index 7e654175f9c..94d753cc986 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_filter.groovy
@@ -19,6 +19,7 @@ suite("test_iceberg_filter",
"p0,external,doris,external_docker,external_docker_
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
+ sql """set enable_external_table_batch_mode=false"""
String rest_port =
context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
@@ -91,6 +92,7 @@ suite("test_iceberg_filter",
"p0,external,doris,external_docker,external_docker_
}
} finally {
+ sql """set enable_external_table_batch_mode=true"""
}
}
}
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
index 556314225c5..bc3b006fb93 100644
---
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
@@ -55,21 +55,76 @@ suite("test_iceberg_optimize_count",
"p0,external,doris,external_docker,external
qt_q03 """${sqlstr3}"""
qt_q04 """${sqlstr4}"""
+ // traditional mode
+ sql """set num_files_in_batch_mode=100000"""
+ explain {
+ sql("""select * from sample_cow_orc""")
+ notContains "approximate"
+ }
explain {
sql("""${sqlstr1}""")
contains """pushdown agg=COUNT (1000)"""
}
+ explain {
+ sql("""select * from sample_cow_parquet""")
+ notContains "approximate"
+ }
explain {
sql("""${sqlstr2}""")
contains """pushdown agg=COUNT (1000)"""
}
+ explain {
+ sql("""select * from sample_mor_orc""")
+ notContains "approximate"
+ }
explain {
sql("""${sqlstr3}""")
contains """pushdown agg=COUNT (1000)"""
}
+ // because it has dangling delete
+ explain {
+ sql("""${sqlstr4}""")
+ contains """pushdown agg=COUNT (-1)"""
+ }
+
+ // batch mode
+ sql """set num_files_in_batch_mode=1"""
+ explain {
+ sql("""select * from sample_cow_orc""")
+ contains "approximate"
+ }
+ explain {
+ sql("""${sqlstr1}""")
+ contains """pushdown agg=COUNT (1000)"""
+ notContains "approximate"
+ }
+ explain {
+ sql("""select * from sample_cow_parquet""")
+ contains "approximate"
+ }
+ explain {
+ sql("""${sqlstr2}""")
+ contains """pushdown agg=COUNT (1000)"""
+ notContains "approximate"
+ }
+ explain {
+ sql("""select * from sample_mor_orc""")
+ contains "approximate"
+ }
+ explain {
+ sql("""${sqlstr3}""")
+ contains """pushdown agg=COUNT (1000)"""
+ notContains "approximate"
+ }
+ explain {
+ sql("""select * from sample_mor_parquet""")
+ contains "approximate"
+ }
+ // because it has dangling delete
explain {
sql("""${sqlstr4}""")
contains """pushdown agg=COUNT (-1)"""
+ contains "approximate"
}
// don't use push down count
@@ -110,6 +165,7 @@ suite("test_iceberg_optimize_count",
"p0,external,doris,external_docker,external
} finally {
sql """ set enable_count_push_down_for_external_table=true; """
+ sql """set num_partitions_in_batch_mode=1024"""
// sql """drop catalog if exists ${catalog_name}"""
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]