This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3dccd751b6c branch-4.0: [feature](multi-catalog) Add 
max_file_split_num session variable to prevent OOM in file scan #58759 (#60732)
3dccd751b6c is described below

commit 3dccd751b6c5146c81227135853f47bfb92dd453
Author: Socrates <[email protected]>
AuthorDate: Sat Feb 14 22:24:49 2026 +0800

    branch-4.0: [feature](multi-catalog) Add max_file_split_num session 
variable to prevent OOM in file scan #58759 (#60732)
    
    - Cherry-picked from #58759
---
 .../apache/doris/datasource/FileQueryScanNode.java |  9 +++
 .../doris/datasource/hive/source/HiveScanNode.java | 11 ++-
 .../datasource/iceberg/source/IcebergScanNode.java | 10 ++-
 .../datasource/paimon/source/PaimonScanNode.java   |  8 +-
 .../doris/datasource/tvf/source/TVFScanNode.java   | 10 ++-
 .../java/org/apache/doris/qe/SessionVariable.java  | 17 ++++
 .../doris/datasource/FileQueryScanNodeTest.java    | 92 ++++++++++++++++++++++
 .../datasource/hive/source/HiveScanNodeTest.java   | 88 +++++++++++++++++++++
 .../iceberg/source/IcebergScanNodeTest.java        | 72 +++++++++++++++++
 .../paimon/source/PaimonScanNodeTest.java          | 25 ++++++
 .../datasource/tvf/source/TVFScanNodeTest.java     | 59 ++++++++++++++
 11 files changed, 387 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index 1ce074ecf1b..ab039e9e9b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -651,4 +651,13 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
         }
         return this.scanParams;
     }
+
+    protected long applyMaxFileSplitNumLimit(long targetSplitSize, long 
totalFileSize) {
+        int maxFileSplitNum = sessionVariable.getMaxFileSplitNum();
+        if (maxFileSplitNum <= 0 || totalFileSize <= 0) {
+            return targetSplitSize;
+        }
+        long minSplitSizeForMaxNum = (totalFileSize + maxFileSplitNum - 1L) / 
(long) maxFileSplitNum;
+        return Math.max(targetSplitSize, minSplitSizeForMaxNum);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 4c18d1ee505..41dc478c38c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -358,18 +358,22 @@ public class HiveScanNode extends FileQueryScanNode {
         }
         long result = sessionVariable.getMaxInitialSplitSize();
         long totalFileSize = 0;
+        boolean exceedInitialThreshold = false;
         for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
             if (fileCacheValue.getFiles() == null) {
                 continue;
             }
             for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
                 totalFileSize += status.getLength();
-                if (totalFileSize >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
-                    result = sessionVariable.getMaxSplitSize();
-                    break;
+                if (!exceedInitialThreshold
+                        && totalFileSize >= sessionVariable.getMaxSplitSize()
+                                * sessionVariable.getMaxInitialSplitNum()) {
+                    exceedInitialThreshold = true;
                 }
             }
         }
+        result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : 
result;
+        result = applyMaxFileSplitNumLimit(result, totalFileSize);
         return result;
     }
 
@@ -635,4 +639,3 @@ public class HiveScanNode extends FileQueryScanNode {
     }
 }
 
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f639a46c092..982fe4a7e2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -476,14 +476,18 @@ public class IcebergScanNode extends FileQueryScanNode {
     private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) {
         long result = sessionVariable.getMaxInitialSplitSize();
         long accumulatedTotalFileSize = 0;
+        boolean exceedInitialThreshold = false;
         for (FileScanTask task : tasks) {
             accumulatedTotalFileSize += 
ScanTaskUtil.contentSizeInBytes(task.file());
-            if (accumulatedTotalFileSize
+            if (!exceedInitialThreshold && accumulatedTotalFileSize
                     >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
-                result = sessionVariable.getMaxSplitSize();
-                break;
+                exceedInitialThreshold = true;
             }
         }
+        result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : 
result;
+        if (!isBatchMode()) {
+            result = applyMaxFileSplitNumLimit(result, 
accumulatedTotalFileSize);
+        }
         return result;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index d8db43c21fd..3004e9dc027 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -436,6 +436,7 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
         long result = sessionVariable.getMaxInitialSplitSize();
         long totalFileSize = 0;
+        boolean exceedInitialThreshold = false;
         for (DataSplit dataSplit : dataSplits) {
             Optional<List<RawFile>> rawFiles = dataSplit.convertToRawFiles();
             if (!supportNativeReader(rawFiles)) {
@@ -443,13 +444,14 @@ public class PaimonScanNode extends FileQueryScanNode {
             }
             for (RawFile rawFile : rawFiles.get()) {
                 totalFileSize += rawFile.fileSize();
-                if (totalFileSize
+                if (!exceedInitialThreshold && totalFileSize
                         >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
-                    result = sessionVariable.getMaxSplitSize();
-                    break;
+                    exceedInitialThreshold = true;
                 }
             }
         }
+        result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : 
result;
+        result = applyMaxFileSplitNumLimit(result, totalFileSize);
         return result;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 543575689da..12db0b7c1bf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -174,14 +174,16 @@ public class TVFScanNode extends FileQueryScanNode {
         }
         long result = sessionVariable.getMaxInitialSplitSize();
         long totalFileSize = 0;
+        boolean exceedInitialThreshold = false;
         for (TBrokerFileStatus fileStatus : fileStatuses) {
             totalFileSize += fileStatus.getSize();
-            if (totalFileSize
-                    >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
-                result = sessionVariable.getMaxSplitSize();
-                break;
+            if (!exceedInitialThreshold
+                    && totalFileSize >= sessionVariable.getMaxSplitSize() * 
sessionVariable.getMaxInitialSplitNum()) {
+                exceedInitialThreshold = true;
             }
         }
+        result = exceedInitialThreshold ? sessionVariable.getMaxSplitSize() : 
result;
+        result = applyMaxFileSplitNumLimit(result, totalFileSize);
         return result;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index de1cf3b3ce3..f35551fac72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -522,6 +522,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String MAX_INITIAL_FILE_SPLIT_NUM = 
"max_initial_file_split_num";
 
+    public static final String MAX_FILE_SPLIT_NUM = "max_file_split_num";
+
     // Target file size in bytes for Iceberg write operations
     public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES = 
"iceberg_write_target_file_size_bytes";
 
@@ -2225,6 +2227,13 @@ public class SessionVariable implements Serializable, 
Writable {
             needForward = true)
     public int maxInitialSplitNum = 200;
 
+    @VariableMgr.VarAttr(
+            name = MAX_FILE_SPLIT_NUM,
+            description = {"在非 batch 模式下,每个 table scan 最大允许的 split 数量,防止产生过多 
split 导致 OOM。",
+                    "In non-batch mode, the maximum number of splits allowed 
per table scan to avoid OOM."},
+            needForward = true)
+    public int maxFileSplitNum = 100000;
+
     // Target file size for Iceberg write operations
     // Default 0 means use config::iceberg_sink_max_file_size
     @VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES, 
needForward = true)
@@ -4308,6 +4317,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.maxInitialSplitNum = maxInitialSplitNum;
     }
 
+    public int getMaxFileSplitNum() {
+        return maxFileSplitNum;
+    }
+
+    public void setMaxFileSplitNum(int maxFileSplitNum) {
+        this.maxFileSplitNum = maxFileSplitNum;
+    }
+
     public long getIcebergWriteTargetFileSizeBytes() {
         return icebergWriteTargetFileSizeBytes;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
new file mode 100644
index 00000000000..8b1d98e509a
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileFormatType;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class FileQueryScanNodeTest {
+    private static final long MB = 1024L * 1024L;
+
+    private static class TestFileQueryScanNode extends FileQueryScanNode {
+        TestFileQueryScanNode(SessionVariable sv) {
+            super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), 
"test",
+                    StatisticalType.TEST_EXTERNAL_TABLE, false, sv);
+        }
+
+        @Override
+        protected TFileFormatType getFileFormatType() throws UserException {
+            return TFileFormatType.FORMAT_ORC;
+        }
+
+        @Override
+        protected List<String> getPathPartitionKeys() throws UserException {
+            return Collections.emptyList();
+        }
+
+        @Override
+        protected TableIf getTargetTable() throws UserException {
+            return null;
+        }
+
+        @Override
+        protected Map<String, String> getLocationProperties() throws 
UserException {
+            return Collections.emptyMap();
+        }
+    }
+
+    @Test
+    public void testApplyMaxFileSplitNumLimitRaisesTargetSize() {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(100);
+        TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
+        long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB);
+        Assert.assertEquals(100 * MB, target);
+    }
+
+    @Test
+    public void testApplyMaxFileSplitNumLimitKeepsTargetSizeWhenSmall() {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(100);
+        TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
+        long target = node.applyMaxFileSplitNumLimit(32 * MB, 500L * MB);
+        Assert.assertEquals(32 * MB, target);
+    }
+
+    @Test
+    public void testApplyMaxFileSplitNumLimitDisabled() {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(0);
+        TestFileQueryScanNode node = new TestFileQueryScanNode(sv);
+        long target = node.applyMaxFileSplitNumLimit(32 * MB, 10_000L * MB);
+        Assert.assertEquals(32 * MB, target);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
new file mode 100644
index 00000000000..727ff939003
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+public class HiveScanNodeTest {
+    private static final long MB = 1024L * 1024L;
+
+    @Test
+    public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws 
Exception {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(100);
+        TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
+        HMSExternalTable table = Mockito.mock(HMSExternalTable.class);
+        HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class);
+        Mockito.when(table.getCatalog()).thenReturn(catalog);
+        Mockito.when(catalog.bindBrokerName()).thenReturn("");
+        desc.setTable(table);
+        HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, 
sv, null);
+
+        HiveMetaStoreCache.FileCacheValue fileCacheValue = new 
HiveMetaStoreCache.FileCacheValue();
+        HiveMetaStoreCache.HiveFileStatus status = new 
HiveMetaStoreCache.HiveFileStatus();
+        status.setLength(10_000L * MB);
+        fileCacheValue.getFiles().add(status);
+        List<HiveMetaStoreCache.FileCacheValue> caches = 
Collections.singletonList(fileCacheValue);
+
+        Method method = HiveScanNode.class.getDeclaredMethod(
+                "determineTargetFileSplitSize", List.class, boolean.class);
+        method.setAccessible(true);
+        long target = (long) method.invoke(node, caches, false);
+        Assert.assertEquals(100 * MB, target);
+    }
+
+    @Test
+    public void testDetermineTargetFileSplitSizeKeepsInitialSize() throws 
Exception {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(100);
+        TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
+        HMSExternalTable table = Mockito.mock(HMSExternalTable.class);
+        HMSExternalCatalog catalog = Mockito.mock(HMSExternalCatalog.class);
+        Mockito.when(table.getCatalog()).thenReturn(catalog);
+        Mockito.when(catalog.bindBrokerName()).thenReturn("");
+        desc.setTable(table);
+        HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, 
sv, null);
+
+        HiveMetaStoreCache.FileCacheValue fileCacheValue = new 
HiveMetaStoreCache.FileCacheValue();
+        HiveMetaStoreCache.HiveFileStatus status = new 
HiveMetaStoreCache.HiveFileStatus();
+        status.setLength(500L * MB);
+        fileCacheValue.getFiles().add(status);
+        List<HiveMetaStoreCache.FileCacheValue> caches = 
Collections.singletonList(fileCacheValue);
+
+        Method method = HiveScanNode.class.getDeclaredMethod(
+                "determineTargetFileSplitSize", List.class, boolean.class);
+        method.setAccessible(true);
+        long target = (long) method.invoke(node, caches, false);
+        Assert.assertEquals(32 * MB, target);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
new file mode 100644
index 00000000000..48031a2303e
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.util.ScanTaskUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+
+public class IcebergScanNodeTest {
+    private static final long MB = 1024L * 1024L;
+
+    private static class TestIcebergScanNode extends IcebergScanNode {
+        TestIcebergScanNode(SessionVariable sv) {
+            super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv);
+        }
+
+        @Override
+        public boolean isBatchMode() {
+            return false;
+        }
+    }
+
+    @Test
+    public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws 
Exception {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(100);
+        TestIcebergScanNode node = new TestIcebergScanNode(sv);
+
+        DataFile dataFile = Mockito.mock(DataFile.class);
+        Mockito.when(dataFile.fileSizeInBytes()).thenReturn(10_000L * MB);
+        FileScanTask task = Mockito.mock(FileScanTask.class);
+        Mockito.when(task.file()).thenReturn(dataFile);
+        Mockito.when(task.length()).thenReturn(10_000L * MB);
+
+        try (org.mockito.MockedStatic<ScanTaskUtil> mockedScanTaskUtil =
+                Mockito.mockStatic(ScanTaskUtil.class)) {
+            mockedScanTaskUtil.when(() -> 
ScanTaskUtil.contentSizeInBytes(dataFile))
+                    .thenReturn(10_000L * MB);
+
+            Method method = 
IcebergScanNode.class.getDeclaredMethod("determineTargetFileSplitSize", 
Iterable.class);
+            method.setAccessible(true);
+            long target = (long) method.invoke(node, 
Collections.singletonList(task));
+            Assert.assertEquals(100 * MB, target);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 692a0db12ca..09795c53910 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -31,6 +31,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.RawFile;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -39,6 +40,7 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -381,6 +383,29 @@ public class PaimonScanNodeTest {
         }
     }
 
+    @Test
+    public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws 
Exception {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(100);
+        PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new 
TupleDescriptor(new TupleId(0)), false, sv);
+
+        PaimonSource source = Mockito.mock(PaimonSource.class);
+        
Mockito.when(source.getFileFormatFromTableProperties()).thenReturn("parquet");
+        node.setSource(source);
+
+        RawFile rawFile = Mockito.mock(RawFile.class);
+        Mockito.when(rawFile.path()).thenReturn("file.parquet");
+        Mockito.when(rawFile.fileSize()).thenReturn(10_000L * 1024L * 1024L);
+
+        DataSplit dataSplit = Mockito.mock(DataSplit.class);
+        
Mockito.when(dataSplit.convertToRawFiles()).thenReturn(Optional.of(Collections.singletonList(rawFile)));
+
+        Method method = 
PaimonScanNode.class.getDeclaredMethod("determineTargetFileSplitSize", 
List.class, boolean.class);
+        method.setAccessible(true);
+        long target = (long) method.invoke(node, 
Collections.singletonList(dataSplit), false);
+        Assert.assertEquals(100L * 1024L * 1024L, target);
+    }
+
     private void mockJniReader(PaimonScanNode spyNode) {
         
Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
new file mode 100644
index 00000000000..8d591362376
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.tvf.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.FunctionGenTable;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.thrift.TBrokerFileStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+public class TVFScanNodeTest {
+    private static final long MB = 1024L * 1024L;
+
+    @Test
+    public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws 
Exception {
+        SessionVariable sv = new SessionVariable();
+        sv.setMaxFileSplitNum(100);
+        TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
+        FunctionGenTable table = Mockito.mock(FunctionGenTable.class);
+        ExternalFileTableValuedFunction tvf = 
Mockito.mock(ExternalFileTableValuedFunction.class);
+        Mockito.when(table.getTvf()).thenReturn(tvf);
+        desc.setTable(table);
+        TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv);
+
+        TBrokerFileStatus status = new TBrokerFileStatus();
+        status.setSize(10_000L * MB);
+        List<TBrokerFileStatus> statuses = Collections.singletonList(status);
+
+        Method method = 
TVFScanNode.class.getDeclaredMethod("determineTargetFileSplitSize", List.class);
+        method.setAccessible(true);
+        long target = (long) method.invoke(node, statuses);
+        Assert.assertEquals(100 * MB, target);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to