This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3dccd751b6c branch-4.0: [feature](multi-catalog) Add
max_file_split_num session variable to prevent OOM in file scan #58759 (#60732)
3dccd751b6c is described below
commit 3dccd751b6c5146c81227135853f47bfb92dd453
Author: Socrates <[email protected]>
AuthorDate: Sat Feb 14 22:24:49 2026 +0800
branch-4.0: [feature](multi-catalog) Add max_file_split_num session
variable to prevent OOM in file scan #58759 (#60732)
- Cherry-picked from #58759
---
.../apache/doris/datasource/FileQueryScanNode.java | 9 +++
.../doris/datasource/hive/source/HiveScanNode.java | 11 ++-
.../datasource/iceberg/source/IcebergScanNode.java | 10 ++-
.../datasource/paimon/source/PaimonScanNode.java | 8 +-
.../doris/datasource/tvf/source/TVFScanNode.java | 10 ++-
.../java/org/apache/doris/qe/SessionVariable.java | 17 ++++
.../doris/datasource/FileQueryScanNodeTest.java | 92 ++++++++++++++++++++++
.../datasource/hive/source/HiveScanNodeTest.java | 88 +++++++++++++++++++++
.../iceberg/source/IcebergScanNodeTest.java | 72 +++++++++++++++++
.../paimon/source/PaimonScanNodeTest.java | 25 ++++++
.../datasource/tvf/source/TVFScanNodeTest.java | 59 ++++++++++++++
11 files changed, 387 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 1ce074ecf1b..ab039e9e9b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -651,4 +651,13 @@ public abstract class FileQueryScanNode extends
FileScanNode {
}
return this.scanParams;
}
+
+ protected long applyMaxFileSplitNumLimit(long targetSplitSize, long
totalFileSize) {
+ int maxFileSplitNum = sessionVariable.getMaxFileSplitNum();
+ if (maxFileSplitNum <= 0 || totalFileSize <= 0) {
+ return targetSplitSize;
+ }
+ long minSplitSizeForMaxNum = (totalFileSize + maxFileSplitNum - 1L) /
(long) maxFileSplitNum;
+ return Math.max(targetSplitSize, minSplitSizeForMaxNum);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 4c18d1ee505..41dc478c38c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -358,18 +358,22 @@ public class HiveScanNode extends FileQueryScanNode {
}
long result = sessionVariable.getMaxInitialSplitSize();
long totalFileSize = 0;
+ boolean exceedInitialThreshold = false;
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() == null) {
continue;
}
for (HiveMetaStoreCache.HiveFileStatus status :
fileCacheValue.getFiles()) {
totalFileSize += status.getLength();
- if (totalFileSize >= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
- result = sessionVariable.getMaxSplitSize();
- break;
+ if (!exceedInitialThreshold
+ && totalFileSize >= sessionVariable.getMaxSplitSize()
+ * sessionVariable.getMaxInitialSplitNum()) {
+ exceedInitialThreshold = true;
}
}
}
+ result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() :
result;
+ result = applyMaxFileSplitNumLimit(result, totalFileSize);
return result;
}
@@ -635,4 +639,3 @@ public class HiveScanNode extends FileQueryScanNode {
}
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f639a46c092..982fe4a7e2c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -476,14 +476,18 @@ public class IcebergScanNode extends FileQueryScanNode {
private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) {
long result = sessionVariable.getMaxInitialSplitSize();
long accumulatedTotalFileSize = 0;
+ boolean exceedInitialThreshold = false;
for (FileScanTask task : tasks) {
accumulatedTotalFileSize +=
ScanTaskUtil.contentSizeInBytes(task.file());
- if (accumulatedTotalFileSize
+ if (!exceedInitialThreshold && accumulatedTotalFileSize
>= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
- result = sessionVariable.getMaxSplitSize();
- break;
+ exceedInitialThreshold = true;
}
}
+ result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() :
result;
+ if (!isBatchMode()) {
+ result = applyMaxFileSplitNumLimit(result,
accumulatedTotalFileSize);
+ }
return result;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index d8db43c21fd..3004e9dc027 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -436,6 +436,7 @@ public class PaimonScanNode extends FileQueryScanNode {
}
long result = sessionVariable.getMaxInitialSplitSize();
long totalFileSize = 0;
+ boolean exceedInitialThreshold = false;
for (DataSplit dataSplit : dataSplits) {
Optional<List<RawFile>> rawFiles = dataSplit.convertToRawFiles();
if (!supportNativeReader(rawFiles)) {
@@ -443,13 +444,14 @@ public class PaimonScanNode extends FileQueryScanNode {
}
for (RawFile rawFile : rawFiles.get()) {
totalFileSize += rawFile.fileSize();
- if (totalFileSize
+ if (!exceedInitialThreshold && totalFileSize
>= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
- result = sessionVariable.getMaxSplitSize();
- break;
+ exceedInitialThreshold = true;
}
}
}
+ result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() :
result;
+ result = applyMaxFileSplitNumLimit(result, totalFileSize);
return result;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 543575689da..12db0b7c1bf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -174,14 +174,16 @@ public class TVFScanNode extends FileQueryScanNode {
}
long result = sessionVariable.getMaxInitialSplitSize();
long totalFileSize = 0;
+ boolean exceedInitialThreshold = false;
for (TBrokerFileStatus fileStatus : fileStatuses) {
totalFileSize += fileStatus.getSize();
- if (totalFileSize
- >= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
- result = sessionVariable.getMaxSplitSize();
- break;
+ if (!exceedInitialThreshold
+ && totalFileSize >= sessionVariable.getMaxSplitSize() *
sessionVariable.getMaxInitialSplitNum()) {
+ exceedInitialThreshold = true;
}
}
+ result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() :
result;
+ result = applyMaxFileSplitNumLimit(result, totalFileSize);
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index de1cf3b3ce3..f35551fac72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -522,6 +522,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String MAX_INITIAL_FILE_SPLIT_NUM =
"max_initial_file_split_num";
+ public static final String MAX_FILE_SPLIT_NUM = "max_file_split_num";
+
// Target file size in bytes for Iceberg write operations
public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES =
"iceberg_write_target_file_size_bytes";
@@ -2225,6 +2227,13 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true)
public int maxInitialSplitNum = 200;
+ @VariableMgr.VarAttr(
+ name = MAX_FILE_SPLIT_NUM,
+ description = {"在非 batch 模式下,每个 table scan 最大允许的 split 数量,防止产生过多
split 导致 OOM。",
+ "In non-batch mode, the maximum number of splits allowed
per table scan to avoid OOM."},
+ needForward = true)
+ public int maxFileSplitNum = 100000;
+
// Target file size for Iceberg write operations
// Default 0 means use config::iceberg_sink_max_file_size
@VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES,
needForward = true)
@@ -4308,6 +4317,14 @@ public class SessionVariable implements Serializable,
Writable {
this.maxInitialSplitNum = maxInitialSplitNum;
}
+ public int getMaxFileSplitNum() {
+ return maxFileSplitNum;
+ }
+
+ public void setMaxFileSplitNum(int maxFileSplitNum) {
+ this.maxFileSplitNum = maxFileSplitNum;
+ }
+
public long getIcebergWriteTargetFileSizeBytes() {
return icebergWriteTargetFileSizeBytes;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
new file mode 100644
index 00000000000..8b1d98e509a
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileFormatType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class FileQueryScanNodeTest {
+ private static final long MB = 1024L * 1024L;
+
+ private static class TestFileQueryScanNode extends FileQueryScanNode {
+ TestFileQueryScanNode(SessionVariable sv) {
+ super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)),
"test",
+ StatisticalType.TEST_EXTERNAL_TABLE, false, sv);
+ }
+
+ @Override
+ protected TFileFormatType getFileFormatType() throws UserException {
+ return TFileFormatType.FORMAT_ORC;
+ }
+
+ @Override
+ protected List<String> getPathPartitionKeys() throws UserException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected TableIf getTargetTable() throws UserException {
+ return null;
+ }
+
+ @Override
+ protected Map<String, String> getLocationProperties() throws
UserException {
+ return Collections.emptyMap();
+ }
+ }
+
+ @Test
+ public void testApplyMaxFileSplitNumLimitRaisesTargetSize() {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(100);
+ TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
+ long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB);
+ Assert.assertEquals(100 * MB, target);
+ }
+
+ @Test
+ public void testApplyMaxFileSplitNumLimitKeepsTargetSizeWhenSmall() {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(100);
+ TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
+ long target = node.applyMaxFileSplitNumLimit(32 * MB, 500L * MB);
+ Assert.assertEquals(32 * MB, target);
+ }
+
+ @Test
+ public void testApplyMaxFileSplitNumLimitDisabled() {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(0);
+ TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
+ long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB);
+ Assert.assertEquals(32 * MB, target);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
new file mode 100644
index 00000000000..727ff939003
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+public class HiveScanNodeTest {
+ private static final long MB = 1024L * 1024L;
+
+ @Test
+ public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws
Exception {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(100);
+ TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
+ HMSExternalTable table = Mockito.mock(HMSExternalTable.class);
+ HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class);
+ Mockito.when(table.getCatalog()).thenReturn(catalog);
+ Mockito.when(catalog.bindBrokerName()).thenReturn("");
+ desc.setTable(table);
+ HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null);
+
+ HiveMetaStoreCache.FileCacheValue fileCacheValue = new
HiveMetaStoreCache.FileCacheValue();
+ HiveMetaStoreCache.HiveFileStatus status = new
HiveMetaStoreCache.HiveFileStatus();
+ status.setLength(10_000L * MB);
+ fileCacheValue.getFiles().add(status);
+ List<HiveMetaStoreCache.FileCacheValue> caches =
Collections.singletonList(fileCacheValue);
+
+ Method method = HiveScanNode.class.getDeclaredMethod(
+ "determineTargetFileSplitSize", List.class, boolean.class);
+ method.setAccessible(true);
+ long target = (long) method.invoke(node, caches, false);
+ Assert.assertEquals(100 * MB, target);
+ }
+
+ @Test
+ public void testDetermineTargetFileSplitSizeKeepsInitialSize() throws
Exception {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(100);
+ TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
+ HMSExternalTable table = Mockito.mock(HMSExternalTable.class);
+ HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class);
+ Mockito.when(table.getCatalog()).thenReturn(catalog);
+ Mockito.when(catalog.bindBrokerName()).thenReturn("");
+ desc.setTable(table);
+ HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false,
sv, null);
+
+ HiveMetaStoreCache.FileCacheValue fileCacheValue = new
HiveMetaStoreCache.FileCacheValue();
+ HiveMetaStoreCache.HiveFileStatus status = new
HiveMetaStoreCache.HiveFileStatus();
+ status.setLength(500L * MB);
+ fileCacheValue.getFiles().add(status);
+ List<HiveMetaStoreCache.FileCacheValue> caches =
Collections.singletonList(fileCacheValue);
+
+ Method method = HiveScanNode.class.getDeclaredMethod(
+ "determineTargetFileSplitSize", List.class, boolean.class);
+ method.setAccessible(true);
+ long target = (long) method.invoke(node, caches, false);
+ Assert.assertEquals(32 * MB, target);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
new file mode 100644
index 00000000000..48031a2303e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.util.ScanTaskUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+public class IcebergScanNodeTest {
+ private static final long MB = 1024L * 1024L;
+
+ private static class TestIcebergScanNode extends IcebergScanNode {
+ TestIcebergScanNode(SessionVariable sv) {
+ super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv);
+ }
+
+ @Override
+ public boolean isBatchMode() {
+ return false;
+ }
+ }
+
+ @Test
+ public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws
Exception {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(100);
+ TestIcebergScanNode node = new TestIcebergScanNode(sv);
+
+ DataFile dataFile = Mockito.mock(DataFile.class);
+ Mockito.when(dataFile.fileSizeInBytes()).thenReturn(10_000L * MB);
+ FileScanTask task = Mockito.mock(FileScanTask.class);
+ Mockito.when(task.file()).thenReturn(dataFile);
+ Mockito.when(task.length()).thenReturn(10_000L * MB);
+
+ try (org.mockito.MockedStatic<ScanTaskUtil> mockedScanTaskUtil =
+ Mockito.mockStatic(ScanTaskUtil.class)) {
+ mockedScanTaskUtil.when(() ->
ScanTaskUtil.contentSizeInBytes(dataFile))
+ .thenReturn(10_000L * MB);
+
+ Method method =
IcebergScanNode.class.getDeclaredMethod("determineTargetFileSplitSize",
Iterable.class);
+ method.setAccessible(true);
+ long target = (long) method.invoke(node,
Collections.singletonList(task));
+ Assert.assertEquals(100 * MB, target);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 692a0db12ca..09795c53910 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -39,6 +40,7 @@ import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -381,6 +383,29 @@ public class PaimonScanNodeTest {
}
}
+ @Test
+ public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws
Exception {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(100);
+ PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new
TupleDescriptor(new TupleId(0)), false, sv);
+
+ PaimonSource source = Mockito.mock(PaimonSource.class);
+
Mockito.when(source.getFileFormatFromTableProperties()).thenReturn("parquet");
+ node.setSource(source);
+
+ RawFile rawFile = Mockito.mock(RawFile.class);
+ Mockito.when(rawFile.path()).thenReturn("file.parquet");
+ Mockito.when(rawFile.fileSize()).thenReturn(10_000L * 1024L * 1024L);
+
+ DataSplit dataSplit = Mockito.mock(DataSplit.class);
+
Mockito.when(dataSplit.convertToRawFiles()).thenReturn(Optional.of(Collections.singletonList(rawFile)));
+
+ Method method =
PaimonScanNode.class.getDeclaredMethod("determineTargetFileSplitSize",
List.class, boolean.class);
+ method.setAccessible(true);
+ long target = (long) method.invoke(node,
Collections.singletonList(dataSplit), false);
+ Assert.assertEquals(100L * 1024L * 1024L, target);
+ }
+
private void mockJniReader(PaimonScanNode spyNode) {
Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
new file mode 100644
index 00000000000..8d591362376
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.tvf.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.FunctionGenTable;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.thrift.TBrokerFileStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+public class TVFScanNodeTest {
+ private static final long MB = 1024L * 1024L;
+
+ @Test
+ public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws
Exception {
+ SessionVariable sv = new SessionVariable();
+ sv.setMaxFileSplitNum(100);
+ TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
+ FunctionGenTable table = Mockito.mock(FunctionGenTable.class);
+ ExternalFileTableValuedFunction tvf =
Mockito.mock(ExternalFileTableValuedFunction.class);
+ Mockito.when(table.getTvf()).thenReturn(tvf);
+ desc.setTable(table);
+ TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv);
+
+ TBrokerFileStatus status = new TBrokerFileStatus();
+ status.setSize(10_000L * MB);
+ List<TBrokerFileStatus> statuses = Collections.singletonList(status);
+
+ Method method =
TVFScanNode.class.getDeclaredMethod("determineTargetFileSplitSize", List.class);
+ method.setAccessible(true);
+ long target = (long) method.invoke(node, statuses);
+ Assert.assertEquals(100 * MB, target);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]