This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d05e4cce0 [Improvement](multi-catalog) The interface of external 
Splitter. WIP (#17390)
0d05e4cce0 is described below

commit 0d05e4cce019e88d5c5cdd99d5435643f9f8f71d
Author: Jibing-Li <[email protected]>
AuthorDate: Sun Mar 12 20:11:08 2023 +0800

    [Improvement](multi-catalog) The interface of external Splitter. WIP 
(#17390)
    
    This is PR introduce splitter interface external table.
    The splitter interface contain one method getSplits, which is used by 
QueryScanProvider to get the external file split.
    For Hive/Iceberg/TVF, a split is a file block. For ES, it is a shard.
    This PR also move the getSplits logic in FileScanProviderIf to the new 
Splitter interface.
    In the future, we may unify internal table as well.
---
 .../doris/datasource/hive/HiveMetaStoreCache.java  |   4 +-
 .../TableFormatType.java => OlapSplitter.java}     |  18 ++-
 .../{external/HiveSplit.java => Split.java}        |  16 +--
 .../TableFormatType.java => Splitter.java}         |  18 +--
 .../doris/planner/external/FileScanProviderIf.java |   7 -
 .../external/{HiveSplit.java => FileSplit.java}    |  27 +++-
 .../doris/planner/external/FileSplitStrategy.java  |   2 -
 .../doris/planner/external/HiveScanProvider.java   |  93 +------------
 .../doris/planner/external/HiveSplitter.java       | 155 +++++++++++++++++++++
 .../doris/planner/external/IcebergSplitter.java    | 154 ++++++++++++++++++++
 .../doris/planner/external/LoadScanProvider.java   |   8 --
 .../doris/planner/external/QueryScanProvider.java  | 149 +++++++++-----------
 .../doris/planner/external/TVFScanProvider.java    |  19 +--
 .../apache/doris/planner/external/TVFSplitter.java |  56 ++++++++
 .../doris/planner/external/TableFormatType.java    |   1 +
 .../external/iceberg/IcebergScanProvider.java      | 118 +---------------
 .../planner/external/iceberg/IcebergSplit.java     |   4 +-
 17 files changed, 490 insertions(+), 359 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 31b8d2f3b4..c2313eb885 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -38,6 +38,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -59,7 +60,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.parquet.Strings;
 
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
@@ -250,6 +250,8 @@ public class HiveMetaStoreCache {
                 InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                 InputSplit[] splits;
                 String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
+
+                // TODO: Implement getSplits logic by ourselves, don't call 
inputFormat.getSplits anymore.
                 if (!Strings.isNullOrEmpty(remoteUser)) {
                     UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(remoteUser);
                     splits = ugi.doAs(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
 b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
similarity index 71%
copy from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
index 794283de80..f2d06a3aef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
@@ -15,19 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner;
 
-public enum TableFormatType {
-    ICEBERG("iceberg"),
-    HUDI("hudi");
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.UserException;
 
-    private final String tableFormatType;
+import java.util.List;
 
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
+public class OlapSplitter implements Splitter {
 
-    public String value() {
-        return tableFormatType;
+    @Override
+    public List<Split> getSplits(List<Expr> exprs) throws UserException {
+        return null;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
similarity index 70%
copy from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
index 6c8f916a5e..63b837aacc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
@@ -15,19 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner;
 
 import lombok.Data;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 
 @Data
-public class HiveSplit extends FileSplit {
-    public HiveSplit() {}
+public abstract class Split {
+    protected String[] hosts;
 
-    public HiveSplit(Path file, long start, long length, String[] hosts) {
-        super(file, start, length, hosts);
-    }
+    public Split() {}
 
-    protected TableFormatType tableFormatType;
+    public Split(String[] hosts) {
+        this.hosts = hosts;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
 b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
similarity index 71%
copy from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
index 794283de80..07952bff87 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
@@ -15,19 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner.external;
+package org.apache.doris.planner;
 
-public enum TableFormatType {
-    ICEBERG("iceberg"),
-    HUDI("hudi");
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.UserException;
 
-    private final String tableFormatType;
+import java.util.List;
 
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
-
-    public String value() {
-        return tableFormatType;
-    }
+public interface Splitter {
+    List<Split> getSplits(List<Expr> exprs) throws UserException;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
index 8ae7952169..f962f4d827 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
@@ -18,7 +18,6 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -28,9 +27,6 @@ import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.TScanRangeLocations;
 
-import org.apache.hadoop.mapred.InputSplit;
-
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -41,9 +37,6 @@ public interface FileScanProviderIf {
     // Return S3/HDSF, etc.
     TFileType getLocationType() throws DdlException, MetaNotFoundException;
 
-    // Return file list
-    List<InputSplit> getSplits(List<Expr> exprs) throws IOException, 
UserException;
-
     // return properties for S3/HDFS, etc.
     Map<String, String> getLocationProperties() throws MetaNotFoundException, 
DdlException;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
similarity index 64%
rename from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 6c8f916a5e..a4e7bfae2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -17,17 +17,32 @@
 
 package org.apache.doris.planner.external;
 
+import org.apache.doris.planner.Split;
+
 import lombok.Data;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
 
 @Data
-public class HiveSplit extends FileSplit {
-    public HiveSplit() {}
+public class FileSplit extends Split {
+    protected Path path;
+    protected long start;
+    protected long length;
+    protected TableFormatType tableFormatType;
+
+    public FileSplit() {}
 
-    public HiveSplit(Path file, long start, long length, String[] hosts) {
-        super(file, start, length, hosts);
+    public FileSplit(Path path, long start, long length, String[] hosts) {
+        this.path = path;
+        this.start = start;
+        this.length = length;
+        this.hosts = hosts;
     }
 
-    protected TableFormatType tableFormatType;
+    public String[] getHosts() {
+        if (this.hosts == null) {
+            return new String[]{};
+        } else {
+            return this.hosts;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
index 83a6d3d49c..e574aeb9d2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
@@ -19,8 +19,6 @@ package org.apache.doris.planner.external;
 
 import org.apache.doris.common.Config;
 
-import org.apache.hadoop.mapred.FileSplit;
-
 public class FileSplitStrategy {
     private long totalSplitSize;
     private int splitNum;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index f538bf3c3e..5b0baf93ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -18,29 +18,18 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.ListPartitionItem;
-import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.HMSExternalCatalog;
-import org.apache.doris.datasource.hive.HiveMetaStoreCache;
-import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues;
-import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.ListPartitionPrunerV2;
 import 
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileFormatType;
@@ -49,17 +38,12 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
 import org.apache.doris.thrift.TFileTextScanRangeParams;
 import org.apache.doris.thrift.TFileType;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -86,6 +70,7 @@ public class HiveScanProvider extends HMSTableScanProvider {
         this.hmsTable = hmsTable;
         this.desc = desc;
         this.columnNameToRange = columnNameToRange;
+        this.splitter = new HiveSplitter(hmsTable, columnNameToRange);
     }
 
     @Override
@@ -138,84 +123,12 @@ public class HiveScanProvider extends 
HMSTableScanProvider {
         return hmsTable.getMetastoreUri();
     }
 
-    @Override
-    public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
-        long start = System.currentTimeMillis();
-        try {
-            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
-                    .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
-            // 1. get ListPartitionItems from cache
-            HivePartitionValues hivePartitionValues = null;
-            List<Type> partitionColumnTypes = 
hmsTable.getPartitionColumnTypes();
-            if (!partitionColumnTypes.isEmpty()) {
-                hivePartitionValues = 
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
-                        partitionColumnTypes);
-            }
-
-            List<InputSplit> allFiles = Lists.newArrayList();
-            if (hivePartitionValues != null) {
-                // 2. prune partitions by expr
-                Map<Long, PartitionItem> idToPartitionItem = 
hivePartitionValues.getIdToPartitionItem();
-                this.totalPartitionNum = idToPartitionItem.size();
-                ListPartitionPrunerV2 pruner = new 
ListPartitionPrunerV2(idToPartitionItem,
-                        hmsTable.getPartitionColumns(), columnNameToRange,
-                        hivePartitionValues.getUidToPartitionRange(),
-                        hivePartitionValues.getRangeToId(),
-                        hivePartitionValues.getSingleColumnRangeMap(),
-                        true);
-                Collection<Long> filteredPartitionIds = pruner.prune();
-                this.readPartitionNum = filteredPartitionIds.size();
-                LOG.debug("hive partition fetch and prune for table {}.{} 
cost: {} ms",
-                        hmsTable.getDbName(), hmsTable.getName(), 
(System.currentTimeMillis() - start));
-
-                // 3. get partitions from cache
-                List<List<String>> partitionValuesList = 
Lists.newArrayListWithCapacity(filteredPartitionIds.size());
-                for (Long id : filteredPartitionIds) {
-                    ListPartitionItem listPartitionItem = (ListPartitionItem) 
idToPartitionItem.get(id);
-                    
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
-                }
-                List<HivePartition> partitions = 
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
-                        partitionValuesList);
-                // 4. get all files of partitions
-                getFileSplitByPartitions(cache, partitions, allFiles);
-            } else {
-                // unpartitioned table, create a dummy partition to save 
location and inputformat,
-                // so that we can unify the interface.
-                HivePartition dummyPartition = new 
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
-                        hmsTable.getRemoteTable().getSd().getLocation(), null);
-                getFileSplitByPartitions(cache, 
Lists.newArrayList(dummyPartition), allFiles);
-                this.totalPartitionNum = 1;
-                this.readPartitionNum = 1;
-            }
-            LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
-                    allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), 
(System.currentTimeMillis() - start));
-            return allFiles;
-        } catch (Throwable t) {
-            LOG.warn("get file split failed for table: {}", 
hmsTable.getName(), t);
-            throw new UserException(
-                    "get file split failed for table: " + hmsTable.getName() + 
", err: " + Util.getRootCauseMessage(t),
-                    t);
-        }
-    }
-
-    private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
-            List<InputSplit> allFiles) {
-        List<InputSplit> files = cache.getFilesByPartitions(partitions);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("get #{} files from #{} partitions: {}", files.size(), 
partitions.size(),
-                    Joiner.on(",")
-                            .join(files.stream().limit(10).map(f -> 
((FileSplit) f).getPath())
-                                    .collect(Collectors.toList())));
-        }
-        allFiles.addAll(files);
-    }
-
     public int getTotalPartitionNum() {
-        return totalPartitionNum;
+        return ((HiveSplitter) splitter).getTotalPartitionNum();
     }
 
     public int getReadPartitionNum() {
-        return readPartitionNum;
+        return ((HiveSplitter) splitter).getReadPartitionNum();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
new file mode 100644
index 0000000000..a49935b9ee
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.ListPartitionPrunerV2;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HiveSplitter implements Splitter {
+
+    private static final Logger LOG = LogManager.getLogger(HiveSplitter.class);
+
+    private HMSExternalTable hmsTable;
+    private Map<String, ColumnRange> columnNameToRange;
+    private int totalPartitionNum = 0;
+    private int readPartitionNum = 0;
+
+    public HiveSplitter(HMSExternalTable hmsTable, Map<String, ColumnRange> 
columnNameToRange) {
+        this.hmsTable = hmsTable;
+        this.columnNameToRange = columnNameToRange;
+    }
+
+    @Override
+    public List<Split> getSplits(List<Expr> exprs) throws UserException {
+        long start = System.currentTimeMillis();
+        try {
+            HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+                    .getMetaStoreCache((HMSExternalCatalog) 
hmsTable.getCatalog());
+            // 1. get ListPartitionItems from cache
+            HiveMetaStoreCache.HivePartitionValues hivePartitionValues = null;
+            List<Type> partitionColumnTypes = 
hmsTable.getPartitionColumnTypes();
+            if (!partitionColumnTypes.isEmpty()) {
+                hivePartitionValues = 
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
+                    partitionColumnTypes);
+            }
+
+            List<Split> allFiles = Lists.newArrayList();
+            if (hivePartitionValues != null) {
+                // 2. prune partitions by expr
+                Map<Long, PartitionItem> idToPartitionItem = 
hivePartitionValues.getIdToPartitionItem();
+                this.totalPartitionNum = idToPartitionItem.size();
+                ListPartitionPrunerV2 pruner = new 
ListPartitionPrunerV2(idToPartitionItem,
+                        hmsTable.getPartitionColumns(), columnNameToRange,
+                        hivePartitionValues.getUidToPartitionRange(),
+                        hivePartitionValues.getRangeToId(),
+                        hivePartitionValues.getSingleColumnRangeMap(),
+                        true);
+                Collection<Long> filteredPartitionIds = pruner.prune();
+                this.readPartitionNum = filteredPartitionIds.size();
+                LOG.debug("hive partition fetch and prune for table {}.{} 
cost: {} ms",
+                        hmsTable.getDbName(), hmsTable.getName(), 
(System.currentTimeMillis() - start));
+
+                // 3. get partitions from cache
+                List<List<String>> partitionValuesList = 
Lists.newArrayListWithCapacity(filteredPartitionIds.size());
+                for (Long id : filteredPartitionIds) {
+                    ListPartitionItem listPartitionItem = (ListPartitionItem) 
idToPartitionItem.get(id);
+                    
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
+                }
+                List<HivePartition> partitions = 
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
+                        partitionValuesList);
+                // 4. get all files of partitions
+                getFileSplitByPartitions(cache, partitions, allFiles);
+            } else {
+                // unpartitioned table, create a dummy partition to save 
location and inputformat,
+                // so that we can unify the interface.
+                HivePartition dummyPartition = new 
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
+                        hmsTable.getRemoteTable().getSd().getLocation(), null);
+                getFileSplitByPartitions(cache, 
Lists.newArrayList(dummyPartition), allFiles);
+                this.totalPartitionNum = 1;
+                this.readPartitionNum = 1;
+            }
+            LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
+                    allFiles.size(), hmsTable.getDbName(), hmsTable.getName(), 
(System.currentTimeMillis() - start));
+            return allFiles;
+        } catch (Throwable t) {
+            LOG.warn("get file split failed for table: {}", 
hmsTable.getName(), t);
+            throw new UserException(
+                "get file split failed for table: " + hmsTable.getName() + ", 
err: " + Util.getRootCauseMessage(t),
+                t);
+        }
+    }
+
+    private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
+                                          List<Split> allFiles) {
+        List<InputSplit> files = cache.getFilesByPartitions(partitions);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("get #{} files from #{} partitions: {}", files.size(), 
partitions.size(),
+                    Joiner.on(",")
+                    .join(files.stream().limit(10).map(f -> ((FileSplit) 
f).getPath())
+                        .collect(Collectors.toList())));
+        }
+        allFiles.addAll(files.stream().map(file -> {
+            FileSplit fs = (FileSplit) file;
+            org.apache.doris.planner.external.FileSplit split = new 
org.apache.doris.planner.external.FileSplit();
+            split.setPath(fs.getPath());
+            split.setStart(fs.getStart());
+            // file size of orc files is not correct get by 
FileSplit.getLength(),
+            // broker reader needs correct file size
+            if (fs instanceof OrcSplit) {
+                split.setLength(((OrcSplit) fs).getFileLength());
+            } else {
+                split.setLength(fs.getLength());
+            }
+            return split;
+        }).collect(Collectors.toList()));
+    }
+
+    public int getTotalPartitionNum() {
+        return totalPartitionNum;
+    }
+
+    public int getReadPartitionNum() {
+        return readPartitionNum;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
new file mode 100644
index 0000000000..b595e95cc3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TableSnapshot;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
+import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter;
+import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
+import org.apache.doris.planner.external.iceberg.IcebergSource;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Conversions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class IcebergSplitter implements Splitter {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergSplitter.class);
+
+    private final IcebergSource icebergSource;
+    private final Analyzer analyzer;
+
+    public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) {
+        this.icebergSource = icebergSource;
+        this.analyzer = analyzer;
+    }
+
+    @Override
+    public List<Split> getSplits(List<Expr> exprs) throws UserException {
+        List<Expression> expressions = new ArrayList<>();
+        org.apache.iceberg.Table table = icebergSource.getIcebergTable();
+        for (Expr conjunct : exprs) {
+            Expression expression = 
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
+            if (expression != null) {
+                expressions.add(expression);
+            }
+        }
+        TableScan scan = table.newScan();
+        TableSnapshot tableSnapshot = 
icebergSource.getDesc().getRef().getTableSnapshot();
+        if (tableSnapshot != null) {
+            TableSnapshot.VersionType type = tableSnapshot.getType();
+            try {
+                if (type == TableSnapshot.VersionType.VERSION) {
+                    scan = scan.useSnapshot(tableSnapshot.getVersion());
+                } else {
+                    long snapshotId = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+                    scan = 
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
+                }
+            } catch (IllegalArgumentException e) {
+                throw new UserException(e);
+            }
+        }
+        for (Expression predicate : expressions) {
+            scan = scan.filter(predicate);
+        }
+        List<Split> splits = new ArrayList<>();
+        int formatVersion = ((BaseTable) 
table).operations().current().formatVersion();
+        for (FileScanTask task : scan.planFiles()) {
+            for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
+                String dataFilePath = splitTask.file().path().toString();
+                IcebergSplit split = new IcebergSplit(new Path(dataFilePath), 
splitTask.start(),
+                        splitTask.length(), new String[0]);
+                split.setFormatVersion(formatVersion);
+                if (formatVersion >= 
IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) {
+                    
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
+                }
+                split.setTableFormatType(TableFormatType.ICEBERG);
+                split.setAnalyzer(analyzer);
+                splits.add(split);
+            }
+        }
+        return splits;
+    }
+
+    public static long getSnapshotIdAsOfTime(List<HistoryEntry> 
historyEntries, long asOfTimestamp) {
+        // find history at or before asOfTimestamp
+        HistoryEntry latestHistory = null;
+        for (HistoryEntry entry : historyEntries) {
+            if (entry.timestampMillis() <= asOfTimestamp) {
+                if (latestHistory == null) {
+                    latestHistory = entry;
+                    continue;
+                }
+                if (entry.timestampMillis() > latestHistory.timestampMillis()) 
{
+                    latestHistory = entry;
+                }
+            }
+        }
+        if (latestHistory == null) {
+            throw new NotFoundException("No version history at or before "
+                + Instant.ofEpochMilli(asOfTimestamp));
+        }
+        return latestHistory.snapshotId();
+    }
+
+    private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask 
spitTask) {
+        List<IcebergDeleteFileFilter> filters = new ArrayList<>();
+        for (DeleteFile delete : spitTask.deletes()) {
+            if (delete.content() == FileContent.POSITION_DELETES) {
+                ByteBuffer lowerBoundBytes = 
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+                Optional<Long> positionLowerBound = 
Optional.ofNullable(lowerBoundBytes)
+                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+                ByteBuffer upperBoundBytes = 
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+                Optional<Long> positionUpperBound = 
Optional.ofNullable(upperBoundBytes)
+                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+                
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
+                        positionLowerBound.orElse(-1L), 
positionUpperBound.orElse(-1L)));
+            } else if (delete.content() == FileContent.EQUALITY_DELETES) {
+                // todo: 
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
+                // delete.equalityFieldIds()));
+                throw new IllegalStateException("Don't support equality delete 
file");
+            } else {
+                throw new IllegalStateException("Unknown delete content: " + 
delete.content());
+            }
+        }
+        return filters;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index d8e644e277..086191e94d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -18,7 +18,6 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.SlotRef;
@@ -51,9 +50,7 @@ import org.apache.doris.thrift.TScanRangeLocations;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.hadoop.mapred.InputSplit;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -77,11 +74,6 @@ public class LoadScanProvider implements FileScanProviderIf {
         return null;
     }
 
-    @Override
-    public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, 
UserException {
-        return null;
-    }
-
     @Override
     public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
         return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 1d25a3cb31..45e48c4ff5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -24,6 +24,8 @@ import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
 import 
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
 import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
 import org.apache.doris.planner.external.iceberg.IcebergSplit;
@@ -42,13 +44,9 @@ import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Joiner;
-import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -56,6 +54,7 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
     public static final Logger LOG = 
LogManager.getLogger(QueryScanProvider.class);
     private int inputSplitNum = 0;
     private long inputFileSize = 0;
+    protected Splitter splitter;
 
     public abstract TFileAttributes getFileAttributes() throws UserException;
 
@@ -63,93 +62,83 @@ public abstract class QueryScanProvider implements 
FileScanProviderIf {
     public void createScanRangeLocations(ParamCreateContext context, 
BackendPolicy backendPolicy,
             List<TScanRangeLocations> scanRangeLocations) throws UserException 
{
         long start = System.currentTimeMillis();
-        try {
-            List<InputSplit> inputSplits = getSplits(context.conjuncts);
-            this.inputSplitNum = inputSplits.size();
-            if (inputSplits.isEmpty()) {
-                return;
-            }
-            InputSplit inputSplit = inputSplits.get(0);
-            TFileType locationType = getLocationType();
-            context.params.setFileType(locationType);
-            TFileFormatType fileFormatType = getFileFormatType();
-            context.params.setFormatType(getFileFormatType());
-            if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || 
fileFormatType == TFileFormatType.FORMAT_JSON) {
-                context.params.setFileAttributes(getFileAttributes());
-            }
+        List<Split> inputSplits = splitter.getSplits(context.conjuncts);
+        this.inputSplitNum = inputSplits.size();
+        if (inputSplits.isEmpty()) {
+            return;
+        }
+        FileSplit inputSplit = (FileSplit) inputSplits.get(0);
+        TFileType locationType = getLocationType();
+        context.params.setFileType(locationType);
+        TFileFormatType fileFormatType = getFileFormatType();
+        context.params.setFormatType(getFileFormatType());
+        if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || 
fileFormatType == TFileFormatType.FORMAT_JSON) {
+            context.params.setFileAttributes(getFileAttributes());
+        }
 
-            // set hdfs params for hdfs file type.
-            Map<String, String> locationProperties = getLocationProperties();
-            if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
-                String fsName = "";
-                if (this instanceof TVFScanProvider) {
-                    fsName = ((TVFScanProvider) this).getFsName();
-                } else {
-                    String fullPath = ((FileSplit) 
inputSplit).getPath().toUri().toString();
-                    String filePath = ((FileSplit) 
inputSplit).getPath().toUri().getPath();
-                    // eg:
-                    // hdfs://namenode
-                    // s3://buckets
-                    fsName = fullPath.replace(filePath, "");
-                }
-                THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
-                tHdfsParams.setFsName(fsName);
-                context.params.setHdfsParams(tHdfsParams);
-
-                if (locationType == TFileType.FILE_BROKER) {
-                    FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
-                    if (broker == null) {
-                        throw new UserException("No alive broker.");
-                    }
-                    context.params.addToBrokerAddresses(new 
TNetworkAddress(broker.ip, broker.port));
+        // set hdfs params for hdfs file type.
+        Map<String, String> locationProperties = getLocationProperties();
+        if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
+            String fsName = "";
+            if (this instanceof TVFScanProvider) {
+                fsName = ((TVFScanProvider) this).getFsName();
+            } else {
+                String fullPath = inputSplit.getPath().toUri().toString();
+                String filePath = inputSplit.getPath().toUri().getPath();
+                // eg:
+                // hdfs://namenode
+                // s3://buckets
+                fsName = fullPath.replace(filePath, "");
+            }
+            THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
+            tHdfsParams.setFsName(fsName);
+            context.params.setHdfsParams(tHdfsParams);
+
+            if (locationType == TFileType.FILE_BROKER) {
+                FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                if (broker == null) {
+                    throw new UserException("No alive broker.");
                 }
-            } else if (locationType == TFileType.FILE_S3) {
-                context.params.setProperties(locationProperties);
+                context.params.addToBrokerAddresses(new 
TNetworkAddress(broker.ip, broker.port));
             }
-            TScanRangeLocations curLocations = newLocations(context.params, 
backendPolicy);
+        } else if (locationType == TFileType.FILE_S3) {
+            context.params.setProperties(locationProperties);
+        }
+        TScanRangeLocations curLocations = newLocations(context.params, 
backendPolicy);
 
-            FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
+        FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
 
-            for (InputSplit split : inputSplits) {
-                FileSplit fileSplit = (FileSplit) split;
-                List<String> pathPartitionKeys = getPathPartitionKeys();
-                List<String> partitionValuesFromPath = 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
-                        pathPartitionKeys, false);
+        for (Split split : inputSplits) {
+            FileSplit fileSplit = (FileSplit) split;
+            List<String> pathPartitionKeys = getPathPartitionKeys();
+            List<String> partitionValuesFromPath = 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
+                    pathPartitionKeys, false);
 
-                TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys);
-                // external data lake table
-                if (split instanceof IcebergSplit) {
-                    IcebergScanProvider.setIcebergParams(rangeDesc, 
(IcebergSplit) split);
-                }
+            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys);
+            // external data lake table
+            if (fileSplit instanceof IcebergSplit) {
+                IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) 
fileSplit);
+            }
 
-                // file size of orc files is not correct get by 
FileSplit.getLength(),
-                // broker reader needs correct file size
-                if (locationType == TFileType.FILE_BROKER && fileFormatType == 
TFileFormatType.FORMAT_ORC) {
-                    rangeDesc.setFileSize(((OrcSplit) 
fileSplit).getFileLength());
-                }
+            
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+            LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
+                    curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
+                    fileSplit.getLength(), 
Joiner.on("|").join(fileSplit.getHosts()));
 
-                
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-                LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
-                        curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
-                        fileSplit.getLength(), 
Joiner.on("|").join(split.getLocations()));
-
-                fileSplitStrategy.update(fileSplit);
-                // Add a new location when it's can be split
-                if (fileSplitStrategy.hasNext()) {
-                    scanRangeLocations.add(curLocations);
-                    curLocations = newLocations(context.params, backendPolicy);
-                    fileSplitStrategy.next();
-                }
-                this.inputFileSize += fileSplit.getLength();
-            }
-            if 
(curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize()
 > 0) {
+            fileSplitStrategy.update(fileSplit);
+            // Add a new location when it's can be split
+            if (fileSplitStrategy.hasNext()) {
                 scanRangeLocations.add(curLocations);
+                curLocations = newLocations(context.params, backendPolicy);
+                fileSplitStrategy.next();
             }
-            LOG.debug("create #{} ScanRangeLocations cost: {} ms",
-                    scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
-        } catch (IOException e) {
-            throw new UserException(e);
+            this.inputFileSize += fileSplit.getLength();
+        }
+        if 
(curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize()
 > 0) {
+            scanRangeLocations.add(curLocations);
         }
+        LOG.debug("create #{} ScanRangeLocations cost: {} ms",
+                scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
index 954d271a94..48365a7656 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
@@ -18,7 +18,6 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
@@ -30,7 +29,6 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.load.BrokerFileGroup;
 import 
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
 import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
-import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileScanRangeParams;
@@ -38,11 +36,7 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
 import org.apache.doris.thrift.TFileType;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -56,6 +50,7 @@ public class TVFScanProvider extends QueryScanProvider {
         this.tvfTable = tvfTable;
         this.desc = desc;
         this.tableValuedFunction = tableValuedFunction;
+        this.splitter = new TVFSplitter(tableValuedFunction);
     }
 
     public String getFsName() {
@@ -80,18 +75,6 @@ public class TVFScanProvider extends QueryScanProvider {
         return tableValuedFunction.getTFileType();
     }
 
-    @Override
-    public List<InputSplit> getSplits(List<Expr> exprs) throws IOException, 
UserException {
-        List<InputSplit> splits = Lists.newArrayList();
-        List<TBrokerFileStatus> fileStatuses = 
tableValuedFunction.getFileStatuses();
-        for (TBrokerFileStatus fileStatus : fileStatuses) {
-            Path path = new Path(fileStatus.getPath());
-            FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getSize(), 
new String[0]);
-            splits.add(fileSplit);
-        }
-        return splits;
-    }
-
     @Override
     public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
         return tableValuedFunction.getLocationProperties();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
new file mode 100644
index 0000000000..d3234c977f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.thrift.TBrokerFileStatus;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class TVFSplitter implements Splitter {
+
+    private static final Logger LOG = LogManager.getLogger(TVFSplitter.class);
+
+    private ExternalFileTableValuedFunction tableValuedFunction;
+
+    public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) {
+        this.tableValuedFunction = tableValuedFunction;
+    }
+
+    @Override
+    public List<Split> getSplits(List<Expr> exprs) throws UserException {
+        List<Split> splits = Lists.newArrayList();
+        List<TBrokerFileStatus> fileStatuses = 
tableValuedFunction.getFileStatuses();
+        for (TBrokerFileStatus fileStatus : fileStatuses) {
+            Path path = new Path(fileStatus.getPath());
+            Split split = new FileSplit(path, 0, fileStatus.getSize(), new 
String[0]);
+            splits.add(split);
+        }
+        return splits;
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 794283de80..6fc5d69544 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -18,6 +18,7 @@
 package org.apache.doris.planner.external;
 
 public enum TableFormatType {
+    HIVE("hive"),
     ICEBERG("iceberg"),
     HUDI("hudi");
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
index 483432b798..eb565638e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
@@ -18,18 +18,14 @@
 package org.apache.doris.planner.external.iceberg;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.TableSnapshot;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.planner.external.IcebergSplitter;
 import org.apache.doris.planner.external.QueryScanProvider;
-import org.apache.doris.planner.external.TableFormatType;
 import org.apache.doris.thrift.TFileAttributes;
 import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
@@ -38,26 +34,11 @@ import org.apache.doris.thrift.TIcebergDeleteFileDesc;
 import org.apache.doris.thrift.TIcebergFileDesc;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileContent;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.HistoryEntry;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.types.Conversions;
 
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
@@ -66,17 +47,17 @@ import java.util.stream.Collectors;
  */
 public class IcebergScanProvider extends QueryScanProvider {
 
-    private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+    public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
     private final Analyzer analyzer;
     private final IcebergSource icebergSource;
 
     public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) 
{
         this.icebergSource = icebergSource;
         this.analyzer = analyzer;
+        this.splitter = new IcebergSplitter(icebergSource, analyzer);
     }
 
-    public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit 
icebergSplit)
-            throws UserException {
+    public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit 
icebergSplit) {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
         TIcebergFileDesc fileDesc = new TIcebergFileDesc();
@@ -139,97 +120,6 @@ public class IcebergScanProvider extends QueryScanProvider 
{
             + " for hms table " + icebergSource.getIcebergTable().name());
     }
 
-    @Override
-    public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
-        List<Expression> expressions = new ArrayList<>();
-        org.apache.iceberg.Table table = icebergSource.getIcebergTable();
-        for (Expr conjunct : exprs) {
-            Expression expression = 
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
-            if (expression != null) {
-                expressions.add(expression);
-            }
-        }
-        TableScan scan = table.newScan();
-        TableSnapshot tableSnapshot = 
icebergSource.getDesc().getRef().getTableSnapshot();
-        if (tableSnapshot != null) {
-            TableSnapshot.VersionType type = tableSnapshot.getType();
-            try {
-                if (type == TableSnapshot.VersionType.VERSION) {
-                    scan = scan.useSnapshot(tableSnapshot.getVersion());
-                } else {
-                    long snapshotId = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
-                    scan = 
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
-                }
-            } catch (IllegalArgumentException e) {
-                throw new UserException(e);
-            }
-        }
-        for (Expression predicate : expressions) {
-            scan = scan.filter(predicate);
-        }
-        List<InputSplit> splits = new ArrayList<>();
-        int formatVersion = ((BaseTable) 
table).operations().current().formatVersion();
-        for (FileScanTask task : scan.planFiles()) {
-            for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
-                String dataFilePath = splitTask.file().path().toString();
-                IcebergSplit split = new IcebergSplit(new Path(dataFilePath), 
splitTask.start(),
-                        splitTask.length(), new String[0]);
-                split.setFormatVersion(formatVersion);
-                if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
-                    
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
-                }
-                split.setTableFormatType(TableFormatType.ICEBERG);
-                split.setAnalyzer(analyzer);
-                splits.add(split);
-            }
-        }
-        return splits;
-    }
-
-    public static long getSnapshotIdAsOfTime(List<HistoryEntry> 
historyEntries, long asOfTimestamp) {
-        // find history at or before asOfTimestamp
-        HistoryEntry latestHistory = null;
-        for (HistoryEntry entry : historyEntries) {
-            if (entry.timestampMillis() <= asOfTimestamp) {
-                if (latestHistory == null) {
-                    latestHistory = entry;
-                    continue;
-                }
-                if (entry.timestampMillis() > latestHistory.timestampMillis()) 
{
-                    latestHistory = entry;
-                }
-            }
-        }
-        if (latestHistory == null) {
-            throw new NotFoundException("No version history at or before "
-                + Instant.ofEpochMilli(asOfTimestamp));
-        }
-        return latestHistory.snapshotId();
-    }
-
-    private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask 
spitTask) {
-        List<IcebergDeleteFileFilter> filters = new ArrayList<>();
-        for (DeleteFile delete : spitTask.deletes()) {
-            if (delete.content() == FileContent.POSITION_DELETES) {
-                ByteBuffer lowerBoundBytes = 
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
-                Optional<Long> positionLowerBound = 
Optional.ofNullable(lowerBoundBytes)
-                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
-                ByteBuffer upperBoundBytes = 
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
-                Optional<Long> positionUpperBound = 
Optional.ofNullable(upperBoundBytes)
-                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
-                
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
-                        positionLowerBound.orElse(-1L), 
positionUpperBound.orElse(-1L)));
-            } else if (delete.content() == FileContent.EQUALITY_DELETES) {
-                // todo: 
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
-                // delete.equalityFieldIds()));
-                throw new IllegalStateException("Don't support equality delete 
file");
-            } else {
-                throw new IllegalStateException("Unknown delete content: " + 
delete.content());
-            }
-        }
-        return filters;
-    }
-
     @Override
     public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
         return 
icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index a82c99b04a..431652b894 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -18,7 +18,7 @@
 package org.apache.doris.planner.external.iceberg;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.planner.external.HiveSplit;
+import org.apache.doris.planner.external.FileSplit;
 
 import lombok.Data;
 import org.apache.hadoop.fs.Path;
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path;
 import java.util.List;
 
 @Data
-public class IcebergSplit extends HiveSplit {
+public class IcebergSplit extends FileSplit {
     public IcebergSplit(Path file, long start, long length, String[] hosts) {
         super(file, start, length, hosts);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to