This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5210c04241 [Refactor](ScanNode) Split interface refactor (#19133)
5210c04241 is described below

commit 5210c04241260903b13de84b8885cfaceabe573e
Author: Jibing-Li <[email protected]>
AuthorDate: Fri May 5 23:20:29 2023 +0800

    [Refactor](ScanNode) Split interface refactor (#19133)
    
    Move getSplits function to ScanNode, remove Splitter interface.
    For each kind of data source, create a specific ScanNode and implement the 
getSplits interface. For example, HiveScanNode.
    Remove FileScanProviderIf move the code to each ScanNode.
---
 .../apache/doris/datasource/ExternalCatalog.java   |  10 +
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  23 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  38 ++-
 .../org/apache/doris/planner/FileLoadScanNode.java |   9 +-
 .../org/apache/doris/planner/OlapSplitter.java     |  31 --
 .../java/org/apache/doris/planner/ScanNode.java    |   6 +
 .../apache/doris/planner/SingleNodePlanner.java    |  25 +-
 .../java/org/apache/doris/planner/Splitter.java    |  29 --
 .../doris/planner/external/FileQueryScanNode.java  | 328 +++++++++++++--------
 .../doris/planner/external/FileScanNode.java       |  70 ++++-
 .../doris/planner/external/FileScanProviderIf.java |  61 ----
 .../apache/doris/planner/external/FileSplit.java   |  18 +-
 .../planner/external/HMSTableScanProvider.java     |  34 ---
 .../{HiveSplitter.java => HiveScanNode.java}       | 205 +++++++------
 .../doris/planner/external/HiveScanProvider.java   | 139 ---------
 .../{HudiScanProvider.java => HudiScanNode.java}   |  29 +-
 .../doris/planner/external/IcebergSplitter.java    | 155 ----------
 .../doris/planner/external/LoadScanProvider.java   |  18 +-
 .../doris/planner/external/QueryScanProvider.java  | 225 --------------
 .../apache/doris/planner/external/TVFScanNode.java | 133 +++++++++
 .../doris/planner/external/TVFScanProvider.java    |  85 ------
 .../apache/doris/planner/external/TVFSplitter.java |  79 -----
 .../planner/external/iceberg/IcebergHMSSource.java |  23 +-
 .../planner/external/iceberg/IcebergScanNode.java  | 286 ++++++++++++++++++
 .../external/iceberg/IcebergScanProvider.java      | 137 ---------
 .../planner/external/iceberg/IcebergSplit.java     |   3 -
 .../org/apache/doris/{planner => spi}/Split.java   |  16 +-
 .../apache/doris/statistics/StatisticalType.java   |   2 +
 .../ExternalFileTableValuedFunction.java           |   4 +-
 29 files changed, 937 insertions(+), 1284 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 1c5c4b4944..adcb109ace 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -473,4 +473,14 @@ public abstract class ExternalCatalog implements 
CatalogIf<ExternalDatabase>, Wr
         }
         return specifiedDatabaseMap;
     }
+
+    public boolean useSelfSplitter() {
+        Map<String, String> properties = catalogProperty.getProperties();
+        boolean ret = true;
+        if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
+                && 
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false"))
 {
+            ret = false;
+        }
+        return ret;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 2df1ed1225..435272782c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -26,10 +26,14 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.RemoteFiles;
 import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.metric.GaugeMetric;
 import org.apache.doris.metric.Metric;
 import org.apache.doris.metric.MetricLabel;
@@ -37,9 +41,8 @@ import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.planner.ColumnBound;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
-import org.apache.doris.planner.Split;
 import org.apache.doris.planner.external.FileSplit;
-import org.apache.doris.planner.external.HiveSplitter;
+import org.apache.doris.spi.Split;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -263,6 +266,19 @@ public class HiveMetaStoreCache {
         return new HivePartition(key.dbName, key.tblName, false, 
sd.getInputFormat(), sd.getLocation(), key.values);
     }
 
+    // Get File Status by using FileSystem API.
+    private FileCacheValue getFileCache(String location, InputFormat<?, ?> 
inputFormat,
+                                                                 JobConf 
jobConf,
+                                                                 List<String> 
partitionValues) throws UserException {
+        FileCacheValue result = new FileCacheValue();
+        result.setSplittable(HiveUtil.isSplittable(inputFormat, new 
Path(location), jobConf));
+        RemoteFileSystem fs = FileSystemFactory.getByLocation(location, 
jobConf);
+        RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
+        locatedFiles.locations().forEach(result::addFile);
+        result.setPartitionValues(partitionValues);
+        return result;
+    }
+
     private FileCacheValue loadFiles(FileCacheKey key) {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         try {
@@ -284,8 +300,7 @@ public class HiveMetaStoreCache {
                 InputFormat<?, ?> inputFormat = 
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
                 // TODO: This is a temp config, will remove it after the 
HiveSplitter is stable.
                 if (key.useSelfSplitter) {
-                    result = HiveSplitter.getFileCache(finalLocation, 
inputFormat,
-                        jobConf, key.getPartitionValues());
+                    result = getFileCache(finalLocation, inputFormat, jobConf, 
key.getPartitionValues());
                 } else {
                     InputSplit[] splits;
                     String remoteUser = 
jobConf.get(HdfsResource.HADOOP_USER_NAME);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index f27abc0f69..acb3ebe87b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -44,6 +44,8 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.external.IcebergExternalTable;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
@@ -139,7 +141,9 @@ import org.apache.doris.planner.SetOperationNode;
 import org.apache.doris.planner.SortNode;
 import org.apache.doris.planner.TableFunctionNode;
 import org.apache.doris.planner.UnionNode;
-import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.HiveScanNode;
+import org.apache.doris.planner.external.HudiScanNode;
+import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
 import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.thrift.TPushAggOp;
@@ -600,24 +604,44 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         ExternalTable table = fileScan.getTable();
         TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, table, 
context);
         tupleDescriptor.setTable(table);
+
         // TODO(cmy): determine the needCheckColumnPriv param
-        FileQueryScanNode fileScanNode = new 
FileQueryScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
+        ScanNode scanNode = null;
+        if (table instanceof HMSExternalTable) {
+            switch (((HMSExternalTable) table).getDlaType()) {
+                case HUDI:
+                    scanNode = new HudiScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+                    break;
+                case ICEBERG:
+                    scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+                    break;
+                case HIVE:
+                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+                    break;
+                default:
+                    break;
+            }
+        } else if (table instanceof IcebergExternalTable) {
+            scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+        }
+        Preconditions.checkNotNull(scanNode);
         TableName tableName = new TableName(null, "", "");
         TableRef ref = new TableRef(tableName, null, null);
         BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
         tupleDescriptor.setRef(tableRef);
 
-        Utils.execWithUncheckedException(fileScanNode::init);
-        context.addScanNode(fileScanNode);
+        Utils.execWithUncheckedException(scanNode::init);
+        context.addScanNode(scanNode);
+        ScanNode finalScanNode = scanNode;
         context.getRuntimeTranslator().ifPresent(
                 runtimeFilterGenerator -> 
runtimeFilterGenerator.getTargetOnScanNode(fileScan.getId()).forEach(
-                    expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, fileScanNode, context)
+                    expr -> 
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode, 
context)
             )
         );
-        Utils.execWithUncheckedException(fileScanNode::finalizeForNereids);
+        Utils.execWithUncheckedException(scanNode::finalizeForNereids);
         // Create PlanFragment
         DataPartition dataPartition = DataPartition.RANDOM;
-        PlanFragment planFragment = createPlanFragment(fileScanNode, 
dataPartition, fileScan);
+        PlanFragment planFragment = createPlanFragment(scanNode, 
dataPartition, fileScan);
         context.addPlanFragment(planFragment);
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
         return planFragment;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index 07d1ff445f..9215d409ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -42,7 +42,6 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.planner.external.FileGroupInfo;
 import org.apache.doris.planner.external.FileScanNode;
-import org.apache.doris.planner.external.FileScanProviderIf;
 import org.apache.doris.planner.external.LoadScanProvider;
 import org.apache.doris.rewrite.ExprRewriter;
 import org.apache.doris.statistics.StatisticalType;
@@ -82,7 +81,7 @@ public class FileLoadScanNode extends FileScanNode {
     // Each DataDescription in a load stmt conreponding to a FileGroupInfo in 
this list.
     private final List<FileGroupInfo> fileGroupInfos = Lists.newArrayList();
     // For load, the num of providers equals to the num of file group infos.
-    private final List<FileScanProviderIf> scanProviders = 
Lists.newArrayList();
+    private final List<LoadScanProvider> scanProviders = Lists.newArrayList();
     // For load, the num of ParamCreateContext equals to the num of file group 
infos.
     private final List<ParamCreateContext> contexts = Lists.newArrayList();
 
@@ -128,13 +127,13 @@ public class FileLoadScanNode extends FileScanNode {
 
     // For each scan provider, create a corresponding ParamCreateContext
     private void initParamCreateContexts(Analyzer analyzer) throws 
UserException {
-        for (FileScanProviderIf scanProvider : scanProviders) {
+        for (LoadScanProvider scanProvider : scanProviders) {
             ParamCreateContext context = scanProvider.createContext(analyzer);
             // set where and preceding filter.
             // FIXME(cmy): we should support set different expr for different 
file group.
             
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), 
context.srcTupleDescriptor, analyzer);
             initAndSetWhereExpr(context.fileGroup.getWhereExpr(), 
context.destTupleDescriptor, analyzer);
-            setDefaultValueExprs(scanProvider, context.srcSlotDescByName, 
context.params, true);
+            setDefaultValueExprs(scanProvider.getTargetTable(), 
context.srcSlotDescByName, context.params, true);
             this.contexts.add(context);
         }
     }
@@ -196,7 +195,7 @@ public class FileLoadScanNode extends FileScanNode {
                 contexts.size() + " vs. " + scanProviders.size());
         for (int i = 0; i < contexts.size(); ++i) {
             FileLoadScanNode.ParamCreateContext context = contexts.get(i);
-            FileScanProviderIf scanProvider = scanProviders.get(i);
+            LoadScanProvider scanProvider = scanProviders.get(i);
             finalizeParamsForLoad(context, analyzer);
             createScanRangeLocations(context, scanProvider);
             this.inputSplitsNum += scanProvider.getInputSplitNum();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
deleted file mode 100644
index f2d06a3aef..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.UserException;
-
-import java.util.List;
-
-public class OlapSplitter implements Splitter {
-
-    @Override
-    public List<Split> getSplits(List<Expr> exprs) throws UserException {
-        return null;
-    }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 13dd8638ba..6c4997ea77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -37,8 +37,10 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRangeLocations;
@@ -104,6 +106,10 @@ public abstract class ScanNode extends PlanNode {
         sortColumn = column;
     }
 
+    protected List<Split> getSplits() throws UserException {
+        throw new NotImplementedException("Scan node sub class need to 
implement getSplits interface.");
+    }
+
     /**
      * cast expr to SlotDescriptor type
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 55df5b6f54..92e2b94148 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -60,6 +60,8 @@ import org.apache.doris.catalog.MysqlTable;
 import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
@@ -67,6 +69,9 @@ import org.apache.doris.common.Reference;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.HiveScanNode;
+import org.apache.doris.planner.external.HudiScanNode;
+import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.thrift.TNullSide;
@@ -1986,8 +1991,7 @@ public class SingleNodePlanner {
             case HIVE:
                 throw new RuntimeException("Hive external table is not 
supported, try to use hive catalog please");
             case ICEBERG:
-                scanNode = new FileQueryScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
-                break;
+                throw new RuntimeException("Iceberg external table is not 
supported, use iceberg catalog please");
             case HUDI:
                 throw new UserException(
                         "Hudi table is no longer supported. Use Multi Catalog 
feature to connect to Hudi");
@@ -1998,8 +2002,23 @@ public class SingleNodePlanner {
                 scanNode = ((TableValuedFunctionRef) 
tblRef).getScanNode(ctx.getNextNodeId());
                 break;
             case HMS_EXTERNAL_TABLE:
+                TableIf table = tblRef.getDesc().getTable();
+                switch (((HMSExternalTable) table).getDlaType()) {
+                    case HUDI:
+                        scanNode = new HudiScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                        break;
+                    case ICEBERG:
+                        scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                        break;
+                    case HIVE:
+                        scanNode = new HiveScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                        break;
+                    default:
+                        throw new UserException("Not supported table type" + 
table.getType());
+                }
+                break;
             case ICEBERG_EXTERNAL_TABLE:
-                scanNode = new FileQueryScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                scanNode = new IcebergScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
                 break;
             case ES_EXTERNAL_TABLE:
                 scanNode = new EsScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), "EsScanNode", true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
deleted file mode 100644
index 5ad1034e61..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.UserException;
-
-import java.util.List;
-
-public interface Splitter {
-    static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
-
-    List<Split> getSplits(List<Expr> exprs) throws UserException;
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 48f35eb363..d5927a1520 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -22,24 +22,38 @@ import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.FunctionGenTable;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.catalog.external.ExternalTable;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.external.iceberg.IcebergApiSource;
-import org.apache.doris.planner.external.iceberg.IcebergHMSSource;
-import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
-import org.apache.doris.planner.external.iceberg.IcebergSource;
+import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
+import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TExternalScanRange;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TFileScanRangeParams;
 import org.apache.doris.thrift.TFileScanSlotInfo;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.THdfsParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -48,20 +62,21 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
  * FileQueryScanNode for querying the file access type of catalog, now only 
support
- * hive,hudi and iceberg.
+ * hive,hudi, iceberg and TVF.
  */
-public class FileQueryScanNode extends FileScanNode {
+public abstract class FileQueryScanNode extends FileScanNode {
     private static final Logger LOG = 
LogManager.getLogger(FileQueryScanNode.class);
 
-    // For query, there is only one FileScanProvider.
-    private FileScanProviderIf scanProvider;
+    protected Map<String, SlotDescriptor> destSlotDescByName;
+    protected TFileScanRangeParams params;
 
-    private Map<String, SlotDescriptor> destSlotDescByName;
-    private TFileScanRangeParams params;
+    protected int inputSplitNum = 0;
+    protected long inputFileSize = 0;
 
     /**
      * External file scan node for Query hms table
@@ -69,8 +84,9 @@ public class FileQueryScanNode extends FileScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
-        super(id, desc, "FILE_QUERY_SCAN_NODE", 
StatisticalType.FILE_SCAN_NODE, needCheckColumnPriv);
+    public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+                             StatisticalType statisticalType, boolean 
needCheckColumnPriv) {
+        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
     }
 
     @Override
@@ -87,36 +103,27 @@ public class FileQueryScanNode extends FileScanNode {
     }
 
     // Init scan provider and schema related params.
-    private void doInitialize() throws UserException {
+    protected void doInitialize() throws UserException {
         Preconditions.checkNotNull(desc);
+        ExternalTable table = (ExternalTable) desc.getTable();
+        if (table.isView()) {
+            throw new AnalysisException(
+                String.format("Querying external view '%s.%s' is not 
supported", table.getDbName(), table.getName()));
+        }
         computeColumnFilter();
-        initScanProvider();
         initBackendPolicy();
         initSchemaParams();
     }
 
-    private void initScanProvider() throws UserException {
-        if (this.desc.getTable() instanceof HMSExternalTable) {
-            HMSExternalTable hmsTable = (HMSExternalTable) 
this.desc.getTable();
-            initHMSTableScanProvider(hmsTable);
-        } else if (this.desc.getTable() instanceof FunctionGenTable) {
-            FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
-            initTVFScanProvider(table, (ExternalFileTableValuedFunction) 
table.getTvf());
-        } else if (this.desc.getTable() instanceof IcebergExternalTable) {
-            IcebergExternalTable table = (IcebergExternalTable) 
this.desc.getTable();
-            initIcebergScanProvider(table);
-        }
-    }
-
     // Init schema (Tuple/Slot) related params.
-    private void initSchemaParams() throws UserException {
+    protected void initSchemaParams() throws UserException {
         destSlotDescByName = Maps.newHashMap();
         for (SlotDescriptor slot : desc.getSlots()) {
             destSlotDescByName.put(slot.getColumn().getName(), slot);
         }
         params = new TFileScanRangeParams();
         params.setDestTupleId(desc.getId().asInt());
-        List<String> partitionKeys = scanProvider.getPathPartitionKeys();
+        List<String> partitionKeys = getPathPartitionKeys();
         List<Column> columns = desc.getTable().getBaseSchema(false);
         params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
         for (SlotDescriptor slot : desc.getSlots()) {
@@ -128,20 +135,13 @@ public class FileQueryScanNode extends FileScanNode {
             
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
             params.addToRequiredSlots(slotInfo);
         }
-        setDefaultValueExprs(scanProvider, destSlotDescByName, params, false);
+        setDefaultValueExprs(getTargetTable(), destSlotDescByName, params, 
false);
         setColumnPositionMappingForTextFile();
         // For query, set src tuple id to -1.
         params.setSrcTupleId(-1);
-        TableIf table = desc.getTable();
-        // Slot to schema id map is used for supporting hive 1.x orc internal 
column name (col0, col1, col2...)
-        if (table instanceof HMSExternalTable) {
-            if (((HMSExternalTable) 
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
-                genSlotToSchemaIdMap();
-            }
-        }
     }
 
-    private void initBackendPolicy() throws UserException {
+    protected void initBackendPolicy() throws UserException {
         backendPolicy.init();
         numNodes = backendPolicy.numBackends();
     }
@@ -161,74 +161,11 @@ public class FileQueryScanNode extends FileScanNode {
 
             TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
             slotInfo.setSlotId(slot.getId().asInt());
-            
slotInfo.setIsFileSlot(!scanProvider.getPathPartitionKeys().contains(slot.getColumn().getName()));
+            
slotInfo.setIsFileSlot(!getPathPartitionKeys().contains(slot.getColumn().getName()));
             params.addToRequiredSlots(slotInfo);
         }
     }
 
-    private void initHMSTableScanProvider(HMSExternalTable hmsTable) throws 
UserException {
-        Preconditions.checkNotNull(hmsTable);
-
-        if (hmsTable.isView()) {
-            throw new AnalysisException(
-                    String.format("Querying external view '[%s].%s.%s' is not 
supported", hmsTable.getDlaType(),
-                            hmsTable.getDbName(), hmsTable.getName()));
-        }
-
-        switch (hmsTable.getDlaType()) {
-            case HUDI:
-                scanProvider = new HudiScanProvider(hmsTable, desc, 
columnNameToRange);
-                break;
-            case ICEBERG:
-                IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc, 
columnNameToRange);
-                scanProvider = new IcebergScanProvider(hmsSource, analyzer);
-                break;
-            case HIVE:
-                String inputFormat = 
hmsTable.getRemoteTable().getSd().getInputFormat();
-                if (inputFormat.contains("TextInputFormat")) {
-                    for (SlotDescriptor slot : desc.getSlots()) {
-                        if (!slot.getType().isScalarType()) {
-                            throw new UserException("For column `" + 
slot.getColumn().getName()
-                                    + "`, The column types ARRAY/MAP/STRUCT 
are not supported yet"
-                                    + " for text input format of Hive. ");
-                        }
-                    }
-                }
-                scanProvider = new HiveScanProvider(hmsTable, desc, 
columnNameToRange);
-                break;
-            default:
-                throw new UserException("Unknown table type: " + 
hmsTable.getDlaType());
-        }
-    }
-
-    private void initIcebergScanProvider(IcebergExternalTable icebergTable) 
throws UserException {
-        Preconditions.checkNotNull(icebergTable);
-        if (icebergTable.isView()) {
-            throw new AnalysisException(
-                String.format("Querying external view '%s.%s' is not 
supported", icebergTable.getDbName(),
-                        icebergTable.getName()));
-        }
-
-        String catalogType = icebergTable.getIcebergCatalogType();
-        switch (catalogType) {
-            case IcebergExternalCatalog.ICEBERG_HMS:
-            case IcebergExternalCatalog.ICEBERG_REST:
-            case IcebergExternalCatalog.ICEBERG_DLF:
-            case IcebergExternalCatalog.ICEBERG_GLUE:
-                IcebergSource icebergSource = new IcebergApiSource(
-                        icebergTable, desc, columnNameToRange);
-                scanProvider = new IcebergScanProvider(icebergSource, 
analyzer);
-                break;
-            default:
-                throw new UserException("Unknown iceberg catalog type: " + 
catalogType);
-        }
-    }
-
-    private void initTVFScanProvider(FunctionGenTable table, 
ExternalFileTableValuedFunction tvf) {
-        Preconditions.checkNotNull(table);
-        scanProvider = new TVFScanProvider(table, desc, tvf);
-    }
-
     @Override
     public void finalize(Analyzer analyzer) throws UserException {
         doFinalize();
@@ -240,19 +177,13 @@ public class FileQueryScanNode extends FileScanNode {
     }
 
     // Create scan range locations and the statistics.
-    private void doFinalize() throws UserException {
-        createScanRangeLocations(conjuncts, params, scanProvider);
-        this.inputSplitsNum += scanProvider.getInputSplitNum();
-        this.totalFileSize += scanProvider.getInputFileSize();
-        if (scanProvider instanceof HiveScanProvider) {
-            this.totalPartitionNum = ((HiveScanProvider) 
scanProvider).getTotalPartitionNum();
-            this.readPartitionNum = ((HiveScanProvider) 
scanProvider).getReadPartitionNum();
-        }
+    protected void doFinalize() throws UserException {
+        createScanRangeLocations();
     }
 
     private void setColumnPositionMappingForTextFile()
             throws UserException {
-        TableIf tbl = scanProvider.getTargetTable();
+        TableIf tbl = getTargetTable();
         List<Integer> columnIdxs = Lists.newArrayList();
 
         for (TFileScanSlotInfo slot : params.getRequiredSlots()) {
@@ -270,20 +201,161 @@ public class FileQueryScanNode extends FileScanNode {
         params.setColumnIdxs(columnIdxs);
     }
 
-    // To Support Hive 1.x orc internal column name like (_col0, _col1, 
_col2...)
-    private void genSlotToSchemaIdMap() {
-        List<Column> baseSchema = desc.getTable().getBaseSchema();
-        Map<String, Integer> columnNameToPosition = Maps.newHashMap();
-        for (SlotDescriptor slot : desc.getSlots()) {
-            int idx = 0;
-            for (Column col : baseSchema) {
-                if (col.getName().equals(slot.getColumn().getName())) {
-                    columnNameToPosition.put(col.getName(), idx);
-                    break;
+    public void createScanRangeLocations() throws UserException {
+        long start = System.currentTimeMillis();
+        List<Split> inputSplits = getSplits();
+        this.inputSplitNum = inputSplits.size();
+        if (inputSplits.isEmpty()) {
+            return;
+        }
+        FileSplit inputSplit = (FileSplit) inputSplits.get(0);
+        TFileType locationType = getLocationType();
+        params.setFileType(locationType);
+        TFileFormatType fileFormatType = getFileFormatType();
+        params.setFormatType(getFileFormatType());
+        if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || 
fileFormatType == TFileFormatType.FORMAT_JSON) {
+            params.setFileAttributes(getFileAttributes());
+        }
+
+        // set hdfs params for hdfs file type.
+        Map<String, String> locationProperties = getLocationProperties();
+        if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
+            String fsName = getFsName(inputSplit);
+            THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
+            tHdfsParams.setFsName(fsName);
+            params.setHdfsParams(tHdfsParams);
+
+            if (locationType == TFileType.FILE_BROKER) {
+                FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+                if (broker == null) {
+                    throw new UserException("No alive broker.");
                 }
-                idx += 1;
+                params.addToBrokerAddresses(new TNetworkAddress(broker.ip, 
broker.port));
+            }
+        } else if (locationType == TFileType.FILE_S3) {
+            params.setProperties(locationProperties);
+        }
+
+        List<String> pathPartitionKeys = getPathPartitionKeys();
+        for (Split split : inputSplits) {
+            TScanRangeLocations curLocations = newLocations(params, 
backendPolicy);
+            FileSplit fileSplit = (FileSplit) split;
+
+            // If fileSplit has partition values, use the values collected 
from hive partitions.
+            // Otherwise, use the values in file path.
+            List<String> partitionValuesFromPath = 
fileSplit.getPartitionValues() == null
+                    ? 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), 
pathPartitionKeys, false)
+                    : fileSplit.getPartitionValues();
+
+            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys);
+            // external data lake table
+            if (fileSplit instanceof IcebergSplit) {
+                IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit) 
fileSplit);
+            }
+
+            
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+            LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
+                    curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
+                    fileSplit.getLength(), 
Joiner.on("|").join(fileSplit.getHosts()));
+            scanRangeLocations.add(curLocations);
+            this.inputFileSize += fileSplit.getLength();
+        }
+        LOG.debug("create #{} ScanRangeLocations cost: {} ms",
+                scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
+    }
+
+    private TScanRangeLocations newLocations(TFileScanRangeParams params, 
FederationBackendPolicy backendPolicy) {
+        // Generate on file scan range
+        TFileScanRange fileScanRange = new TFileScanRange();
+        fileScanRange.setParams(params);
+
+        // Scan range
+        TExternalScanRange externalScanRange = new TExternalScanRange();
+        externalScanRange.setFileScanRange(fileScanRange);
+        TScanRange scanRange = new TScanRange();
+        scanRange.setExtScanRange(externalScanRange);
+
+        // Locations
+        TScanRangeLocations locations = new TScanRangeLocations();
+        locations.setScanRange(scanRange);
+
+        TScanRangeLocation location = new TScanRangeLocation();
+        Backend selectedBackend = backendPolicy.getNextBe();
+        location.setBackendId(selectedBackend.getId());
+        location.setServer(new TNetworkAddress(selectedBackend.getIp(), 
selectedBackend.getBePort()));
+        locations.addToLocations(location);
+
+        return locations;
+    }
+
+    private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, 
List<String> columnsFromPath,
+                                               List<String> 
columnsFromPathKeys)
+            throws UserException {
+        TFileRangeDesc rangeDesc = new TFileRangeDesc();
+        rangeDesc.setStartOffset(fileSplit.getStart());
+        rangeDesc.setSize(fileSplit.getLength());
+        // fileSize only be used when format is orc or parquet and TFileType 
is broker
+        // When TFileType is other type, it is not necessary
+        rangeDesc.setFileSize(fileSplit.getFileLength());
+        rangeDesc.setColumnsFromPath(columnsFromPath);
+        rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
+
+        if (getLocationType() == TFileType.FILE_HDFS) {
+            rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+        } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() 
== TFileType.FILE_BROKER) {
+            // need full path
+            rangeDesc.setPath(fileSplit.getPath().toString());
+        }
+        return rangeDesc;
+    }
+
+    protected TFileType getLocationType() throws UserException {
+        throw new NotImplementedException("");
+    }
+
+    protected TFileFormatType getFileFormatType() throws UserException {
+        throw new NotImplementedException("");
+    }
+
+    protected TFileAttributes getFileAttributes() throws UserException {
+        throw new NotImplementedException("");
+    }
+
+    protected List<String> getPathPartitionKeys() throws UserException {
+        throw new NotImplementedException("");
+    }
+
+    protected TableIf getTargetTable() throws UserException {
+        throw new NotImplementedException("");
+    }
+
+    protected Map<String, String> getLocationProperties() throws UserException 
 {
+        throw new NotImplementedException("");
+    }
+
+    // eg: hdfs://namenode  s3://buckets
+    protected String getFsName(FileSplit split) {
+        String fullPath = split.getPath().toUri().toString();
+        String filePath = split.getPath().toUri().getPath();
+        return fullPath.replace(filePath, "");
+    }
+
+    protected static Optional<TFileType> getTFileType(String location) {
+        if (location != null && !location.isEmpty()) {
+            if (FeConstants.isObjStorage(location)) {
+                return Optional.of(TFileType.FILE_S3);
+            } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
+                return Optional.of(TFileType.FILE_HDFS);
+            } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+                return Optional.of(TFileType.FILE_LOCAL);
+            } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
+                return Optional.of(TFileType.FILE_BROKER);
+            } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
+                return Optional.of(TFileType.FILE_BROKER);
+            } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
+                return Optional.of(TFileType.FILE_BROKER);
             }
         }
-        params.setSlotNameToSchemaPos(columnNameToPosition);
+        return Optional.empty();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 364844ef36..3a24ba3319 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -27,6 +27,8 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.FileLoadScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
@@ -41,9 +43,12 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -55,6 +60,8 @@ import java.util.Map;
 public class FileScanNode extends ExternalScanNode {
     private static final Logger LOG = LogManager.getLogger(FileScanNode.class);
 
+    public static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
+
     // For explain
     protected long inputSplitsNum = 0;
     protected long totalFileSize = 0;
@@ -152,24 +159,17 @@ public class FileScanNode extends ExternalScanNode {
         return output.toString();
     }
 
-    // TODO: Keep 2 versions of createScanRangeLocations, will fix this while 
refactor split and assignment code.
+    // TODO: This api is for load job only. Will remove it later.
     protected void 
createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
-                                            FileScanProviderIf scanProvider)
+                                            LoadScanProvider scanProvider)
             throws UserException {
         scanProvider.createScanRangeLocations(context, backendPolicy, 
scanRangeLocations);
     }
 
-    protected void createScanRangeLocations(List<Expr> conjuncts, 
TFileScanRangeParams params,
-                                            FileScanProviderIf scanProvider)
-            throws UserException {
-        scanProvider.createScanRangeLocations(conjuncts, params, 
backendPolicy, scanRangeLocations);
-    }
-
-    protected void setDefaultValueExprs(FileScanProviderIf scanProvider,
+    protected void setDefaultValueExprs(TableIf tbl,
                                         Map<String, SlotDescriptor> 
slotDescByName,
                                         TFileScanRangeParams params,
                                         boolean useVarcharAsNull) throws 
UserException {
-        TableIf tbl = scanProvider.getTargetTable();
         Preconditions.checkNotNull(tbl);
         TExpr tExpr = new TExpr();
         tExpr.setNodes(Lists.newArrayList());
@@ -210,4 +210,54 @@ public class FileScanNode extends ExternalScanNode {
             }
         }
     }
+
+    protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] 
blockLocations, long length,
+                                  boolean splittable, List<String> 
partitionValues) throws IOException {
+        if (blockLocations == null) {
+            blockLocations = new BlockLocation[0];
+        }
+        long splitSize = 
ConnectContext.get().getSessionVariable().getFileSplitSize();
+        if (splitSize <= 0) {
+            splitSize = blockSize;
+        }
+        // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+        splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE);
+        List<Split> result = Lists.newArrayList();
+        if (!splittable) {
+            LOG.debug("Path {} is not splittable.", path);
+            String[] hosts = blockLocations.length == 0 ? null : 
blockLocations[0].getHosts();
+            result.add(new FileSplit(path, 0, length, length, hosts, 
partitionValues));
+            return result;
+        }
+        long bytesRemaining;
+        for (bytesRemaining = length; (double) bytesRemaining / (double) 
splitSize > 1.1D;
+                bytesRemaining -= splitSize) {
+            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
+            result.add(new FileSplit(path, length - bytesRemaining, splitSize, 
length, hosts, partitionValues));
+        }
+        if (bytesRemaining != 0L) {
+            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
+            String[] hosts = location == -1 ? null : 
blockLocations[location].getHosts();
+            result.add(new FileSplit(path, length - bytesRemaining, 
bytesRemaining, length, hosts, partitionValues));
+        }
+
+        LOG.debug("Path {} includes {} splits.", path, result.size());
+        return result;
+    }
+
+    private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+        if (blkLocations == null || blkLocations.length == 0) {
+            return -1;
+        }
+        for (int i = 0; i < blkLocations.length; ++i) {
+            if (blkLocations[i].getOffset() <= offset
+                    && offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength()) {
+                return i;
+            }
+        }
+        BlockLocation last = blkLocations[blkLocations.length - 1];
+        long fileLength = last.getOffset() + last.getLength() - 1L;
+        throw new IllegalArgumentException(String.format("Offset %d is outside 
of file (0..%d)", offset, fileLength));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
deleted file mode 100644
index 367e5d4e72..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.FileLoadScanNode;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TScanRangeLocations;
-
-import java.util.List;
-import java.util.Map;
-
-public interface FileScanProviderIf {
-    // Return parquet/orc/text, etc.
-    TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException;
-
-    // Return S3/HDSF, etc.
-    TFileType getLocationType() throws DdlException, MetaNotFoundException;
-
-    // return properties for S3/HDFS, etc.
-    Map<String, String> getLocationProperties() throws MetaNotFoundException, 
DdlException;
-
-    List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException;
-
-    FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) 
throws UserException;
-
-    void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context, 
FederationBackendPolicy backendPolicy,
-                                  List<TScanRangeLocations> 
scanRangeLocations) throws UserException;
-
-    void createScanRangeLocations(List<Expr> conjuncts, TFileScanRangeParams 
params,
-                                  FederationBackendPolicy backendPolicy,
-                                  List<TScanRangeLocations> 
scanRangeLocations) throws UserException;
-
-    int getInputSplitNum();
-
-    long getInputFileSize();
-
-    TableIf getTargetTable();
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index bd0fa97cc4..03f1ab3b60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.planner.external;
 
-import org.apache.doris.planner.Split;
+import org.apache.doris.spi.Split;
 
 import lombok.Data;
 import org.apache.hadoop.fs.Path;
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path;
 import java.util.List;
 
 @Data
-public class FileSplit extends Split {
+public class FileSplit implements Split {
     protected Path path;
     protected long start;
     // length of this split, in bytes
@@ -34,6 +34,7 @@ public class FileSplit extends Split {
     // -1 means unset.
     // If the file length is not set, the file length will be fetched from the 
file system.
     protected long fileLength;
+    protected String[] hosts;
     protected TableFormatType tableFormatType;
     // The values of partitions.
     // e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
@@ -46,15 +47,16 @@ public class FileSplit extends Split {
         this.start = start;
         this.length = length;
         this.fileLength = fileLength;
-        this.hosts = hosts;
+        this.hosts = hosts == null ? new String[0] : hosts;
         this.partitionValues = partitionValues;
     }
 
     public String[] getHosts() {
-        if (this.hosts == null) {
-            return new String[]{};
-        } else {
-            return this.hosts;
-        }
+        return hosts;
+    }
+
+    @Override
+    public Object getInfo() {
+        return null;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java
deleted file mode 100644
index 283fc90f30..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.util.Map;
-
-public abstract class HMSTableScanProvider extends QueryScanProvider {
-
-    public abstract String getMetaStoreUrl();
-
-    public abstract Table getRemoteHiveTable() throws DdlException, 
MetaNotFoundException;
-
-    public abstract Map<String, String> getTableProperties() throws 
MetaNotFoundException;
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
similarity index 51%
rename from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 9d8093e982..96c1872010 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -17,32 +17,34 @@
 
 package org.apache.doris.planner.external;
 
-import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.datasource.hive.HivePartition;
-import org.apache.doris.external.hive.util.HiveUtil;
-import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.RemoteFiles;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.planner.ColumnRange;
 import org.apache.doris.planner.ListPartitionPrunerV2;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TFileType;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -50,23 +52,52 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HiveScanNode extends FileQueryScanNode {
+    private static final Logger LOG = LogManager.getLogger(HiveScanNode.class);
+
+    public static final String PROP_FIELD_DELIMITER = "field.delim";
+    public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+    private final HMSExternalTable hmsTable;
+
+    /**
+     * * External file scan node for Query Hive table
+     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
+     * eg: s3 tvf
+     * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
+     */
+    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+        super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv);
+        hmsTable = (HMSExternalTable) desc.getTable();
+    }
 
-public class HiveSplitter implements Splitter {
-
-    private static final Logger LOG = LogManager.getLogger(HiveSplitter.class);
-
-    private HMSExternalTable hmsTable;
-    private Map<String, ColumnRange> columnNameToRange;
-    private int totalPartitionNum = 0;
-    private int readPartitionNum = 0;
+    public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+                        StatisticalType statisticalType, boolean 
needCheckColumnPriv) {
+        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+        hmsTable = (HMSExternalTable) desc.getTable();
+    }
 
-    public HiveSplitter(HMSExternalTable hmsTable, Map<String, ColumnRange> 
columnNameToRange) {
-        this.hmsTable = hmsTable;
-        this.columnNameToRange = columnNameToRange;
+    @Override
+    protected void doInitialize() throws UserException {
+        super.doInitialize();
+        genSlotToSchemaIdMap();
+        String inputFormat = 
hmsTable.getRemoteTable().getSd().getInputFormat();
+        if (inputFormat.contains("TextInputFormat")) {
+            for (SlotDescriptor slot : desc.getSlots()) {
+                if (!slot.getType().isScalarType()) {
+                    throw new UserException("For column `" + 
slot.getColumn().getName()
+                        + "`, The column types ARRAY/MAP/STRUCT are not 
supported yet"
+                        + " for text input format of Hive. ");
+                }
+            }
+        }
     }
 
     @Override
-    public List<Split> getSplits(List<Expr> exprs) throws UserException {
+    protected List<Split> getSplits() throws UserException {
         long start = System.currentTimeMillis();
         try {
             HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
@@ -78,14 +109,7 @@ public class HiveSplitter implements Splitter {
                 hivePartitionValues = 
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
                     partitionColumnTypes);
             }
-            Map<String, String> properties = 
hmsTable.getCatalog().getCatalogProperty().getProperties();
-            boolean useSelfSplitter = true;
-            if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
-                    && 
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false"))
 {
-                LOG.debug("Using self splitter for hmsTable {}", 
hmsTable.getName());
-                useSelfSplitter = false;
-            }
-
+            boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter();
             List<Split> allFiles = Lists.newArrayList();
             if (hivePartitionValues != null) {
                 // 2. prune partitions by expr
@@ -135,85 +159,88 @@ public class HiveSplitter implements Splitter {
 
     private void getFileSplitByPartitions(HiveMetaStoreCache cache, 
List<HivePartition> partitions,
                                           List<Split> allFiles, boolean 
useSelfSplitter) throws IOException {
+
         for (HiveMetaStoreCache.FileCacheValue fileCacheValue :
                 cache.getFilesByPartitions(partitions, useSelfSplitter)) {
+            // This if branch is to support old splitter, will remove later.
             if (fileCacheValue.getSplits() != null) {
                 allFiles.addAll(fileCacheValue.getSplits());
             }
             if (fileCacheValue.getFiles() != null) {
                 boolean isSplittable = fileCacheValue.isSplittable();
                 for (HiveMetaStoreCache.HiveFileStatus status : 
fileCacheValue.getFiles()) {
-                    allFiles.addAll(splitFile(status, isSplittable, 
fileCacheValue.getPartitionValues()));
+                    allFiles.addAll(splitFile(status.getPath(), 
status.getBlockSize(),
+                            status.getBlockLocations(), status.getLength(),
+                            isSplittable, 
fileCacheValue.getPartitionValues()));
                 }
             }
         }
     }
 
-    private List<Split> splitFile(HiveMetaStoreCache.HiveFileStatus status,
-                                  boolean splittable, List<String> 
partitionValues) throws IOException {
-        List<Split> result = Lists.newArrayList();
-        if (!splittable) {
-            LOG.debug("Path {} is not splittable.", status.getPath());
-            BlockLocation block = status.getBlockLocations()[0];
-            result.add(new FileSplit(status.getPath(), 0, status.getLength(),
-                    status.getLength(), block.getHosts(), partitionValues));
-            return result;
-        }
-        long splitSize = 
ConnectContext.get().getSessionVariable().getFileSplitSize();
-        if (splitSize <= 0) {
-            splitSize = status.getBlockSize();
-        }
-        // Min split size is DEFAULT_SPLIT_SIZE(128MB).
-        splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : 
DEFAULT_SPLIT_SIZE;
-        BlockLocation[] blockLocations = status.getBlockLocations();
-        long length = status.getLength();
-        long bytesRemaining;
-        for (bytesRemaining = length; (double) bytesRemaining / (double) 
splitSize > 1.1D;
-                bytesRemaining -= splitSize) {
-            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-            result.add(new FileSplit(status.getPath(), length - bytesRemaining,
-                    splitSize, length, blockLocations[location].getHosts(), 
partitionValues));
-        }
-        if (bytesRemaining != 0L) {
-            int location = getBlockIndex(blockLocations, length - 
bytesRemaining);
-            result.add(new FileSplit(status.getPath(), length - bytesRemaining,
-                    bytesRemaining, length, 
blockLocations[location].getHosts(), partitionValues));
-        }
+    @Override
+    public List<String> getPathPartitionKeys() {
+        return hmsTable.getRemoteTable().getPartitionKeys()
+            .stream().map(FieldSchema::getName).collect(Collectors.toList());
+    }
 
-        LOG.debug("Path {} includes {} splits.", status.getPath(), 
result.size());
-        return result;
+    @Override
+    public TableIf getTargetTable() {
+        return hmsTable;
     }
 
-    public int getTotalPartitionNum() {
-        return totalPartitionNum;
+    @Override
+    protected TFileType getLocationType() throws UserException {
+        String location = hmsTable.getRemoteTable().getSd().getLocation();
+        return getTFileType(location).orElseThrow(() ->
+            new DdlException("Unknown file location " + location + " for hms 
table " + hmsTable.getName()));
     }
 
-    public int getReadPartitionNum() {
-        return readPartitionNum;
+    @Override
+    public TFileFormatType getFileFormatType() throws UserException {
+        TFileFormatType type = null;
+        String inputFormatName = 
hmsTable.getRemoteTable().getSd().getInputFormat();
+        String hiveFormat = 
HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
+        if 
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) 
{
+            type = TFileFormatType.FORMAT_PARQUET;
+        } else if 
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
+            type = TFileFormatType.FORMAT_ORC;
+        } else if 
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc()))
 {
+            type = TFileFormatType.FORMAT_CSV_PLAIN;
+        }
+        return type;
+    }
+
+    @Override
+    protected Map<String, String> getLocationProperties() throws UserException 
 {
+        return hmsTable.getCatalogProperties();
     }
 
-    // Get File Status by using FileSystem API.
-    public static HiveMetaStoreCache.FileCacheValue getFileCache(String 
location, InputFormat<?, ?> inputFormat,
-                                                                 JobConf 
jobConf,
-                                                                 List<String> 
partitionValues) throws UserException {
-        HiveMetaStoreCache.FileCacheValue result = new 
HiveMetaStoreCache.FileCacheValue();
-        result.setSplittable(HiveUtil.isSplittable(inputFormat, new 
Path(location), jobConf));
-        RemoteFileSystem fs = FileSystemFactory.getByLocation(location, 
jobConf);
-        RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
-        locatedFiles.locations().forEach(result::addFile);
-        result.setPartitionValues(partitionValues);
-        return result;
+    @Override
+    protected TFileAttributes getFileAttributes() throws UserException {
+        TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+        
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
+                .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
+        textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
+        TFileAttributes fileAttributes = new TFileAttributes();
+        fileAttributes.setTextParams(textParams);
+        fileAttributes.setHeaderType("");
+        return fileAttributes;
     }
 
-    private static int getBlockIndex(BlockLocation[] blkLocations, long 
offset) {
-        for (int i = 0; i < blkLocations.length; ++i) {
-            if (blkLocations[i].getOffset() <= offset
-                    && offset < blkLocations[i].getOffset() + 
blkLocations[i].getLength()) {
-                return i;
+    // To Support Hive 1.x orc internal column name like (_col0, _col1, 
_col2...)
+    private void genSlotToSchemaIdMap() {
+        List<Column> baseSchema = desc.getTable().getBaseSchema();
+        Map<String, Integer> columnNameToPosition = Maps.newHashMap();
+        for (SlotDescriptor slot : desc.getSlots()) {
+            int idx = 0;
+            for (Column col : baseSchema) {
+                if (col.getName().equals(slot.getColumn().getName())) {
+                    columnNameToPosition.put(col.getName(), idx);
+                    break;
+                }
+                idx += 1;
             }
         }
-        BlockLocation last = blkLocations[blkLocations.length - 1];
-        long fileLength = last.getOffset() + last.getLength() - 1L;
-        throw new IllegalArgumentException(String.format("Offset %d is outside 
of file (0..%d)", offset, fileLength));
+        params.setSlotNameToSchemaPos(columnNameToPosition);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
deleted file mode 100644
index 4037f97a13..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ /dev/null
@@ -1,139 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileTextScanRangeParams;
-import org.apache.doris.thrift.TFileType;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * A HiveScanProvider to get information for scan node.
- */
-public class HiveScanProvider extends HMSTableScanProvider {
-    private static final Logger LOG = 
LogManager.getLogger(HiveScanProvider.class);
-
-    private static final String PROP_FIELD_DELIMITER = "field.delim";
-    private static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
-    private static final String DEFAULT_LINE_DELIMITER = "\n";
-
-    protected HMSExternalTable hmsTable;
-    protected final TupleDescriptor desc;
-    protected Map<String, ColumnRange> columnNameToRange;
-
-    public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
-            Map<String, ColumnRange> columnNameToRange) {
-        this.hmsTable = hmsTable;
-        this.desc = desc;
-        this.columnNameToRange = columnNameToRange;
-        this.splitter = new HiveSplitter(hmsTable, columnNameToRange);
-    }
-
-    @Override
-    public TableIf getTargetTable() {
-        return hmsTable;
-    }
-
-    @Override
-    public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
-        TFileFormatType type = null;
-        String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
-        String hiveFormat = 
HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
-        if 
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc())) 
{
-            type = TFileFormatType.FORMAT_PARQUET;
-        } else if 
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
-            type = TFileFormatType.FORMAT_ORC;
-        } else if 
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc()))
 {
-            type = TFileFormatType.FORMAT_CSV_PLAIN;
-        }
-        return type;
-    }
-
-    @Override
-    public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
-        String location = hmsTable.getRemoteTable().getSd().getLocation();
-        return getTFileType(location).orElseThrow(() ->
-                    new DdlException("Unknown file location " + location + " 
for hms table " + hmsTable.getName()));
-    }
-
-    @Override
-    public String getMetaStoreUrl() {
-        return hmsTable.getMetastoreUri();
-    }
-
-    public int getTotalPartitionNum() {
-        return ((HiveSplitter) splitter).getTotalPartitionNum();
-    }
-
-    public int getReadPartitionNum() {
-        return ((HiveSplitter) splitter).getReadPartitionNum();
-    }
-
-    @Override
-    public Table getRemoteHiveTable() throws DdlException, 
MetaNotFoundException {
-        return hmsTable.getRemoteTable();
-    }
-
-    @Override
-    public Map<String, String> getTableProperties() throws 
MetaNotFoundException {
-        // TODO: implement it when we really properties from remote table.
-        return Maps.newHashMap();
-    }
-
-    @Override
-    public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
-        return hmsTable.getCatalogProperties();
-    }
-
-    @Override
-    public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
-        return 
getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList());
-    }
-
-    @Override
-    public TFileAttributes getFileAttributes() throws UserException {
-        TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
-        
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
-                .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
-        textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
-        TFileAttributes fileAttributes = new TFileAttributes();
-        fileAttributes.setTextParams(textParams);
-        fileAttributes.setHeaderType("");
-        return fileAttributes;
-    }
-}
-
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
 b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java
similarity index 59%
rename from 
fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java
index 59274ec521..34ff3b7457 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java
@@ -18,34 +18,31 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileFormatType;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
-/**
- * A file scan provider for hudi.
- * HudiProvier is extended with hive since they both use input format 
interface to get the split.
- */
-public class HudiScanProvider extends HiveScanProvider {
-
-    public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
-            Map<String, ColumnRange> columnNameToRange) {
-        super(hmsTable, desc, columnNameToRange);
+public class HudiScanNode extends HiveScanNode {
+    /**
+     * External file scan node for Query Hudi table
+     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
+     * eg: s3 tvf
+     * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
+     */
+    public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv);
     }
 
     @Override
-    public TFileFormatType getFileFormatType() throws DdlException {
+    public TFileFormatType getFileFormatType() {
         return TFileFormatType.FORMAT_PARQUET;
     }
 
     @Override
-    public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
+    public List<String> getPathPartitionKeys() {
         return Collections.emptyList();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
deleted file mode 100644
index 15f19bf7a5..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.TableSnapshot;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.external.iceberg.util.IcebergUtils;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter;
-import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
-import org.apache.doris.planner.external.iceberg.IcebergSource;
-import org.apache.doris.planner.external.iceberg.IcebergSplit;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.HistoryEntry;
-import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.types.Conversions;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class IcebergSplitter implements Splitter {
-    private static final Logger LOG = 
LogManager.getLogger(IcebergSplitter.class);
-
-    private final IcebergSource icebergSource;
-    private final Analyzer analyzer;
-
-    public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) {
-        this.icebergSource = icebergSource;
-        this.analyzer = analyzer;
-    }
-
-    @Override
-    public List<Split> getSplits(List<Expr> exprs) throws UserException {
-        List<Expression> expressions = new ArrayList<>();
-        org.apache.iceberg.Table table = icebergSource.getIcebergTable();
-        for (Expr conjunct : exprs) {
-            Expression expression = 
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
-            if (expression != null) {
-                expressions.add(expression);
-            }
-        }
-        TableScan scan = table.newScan();
-        TableSnapshot tableSnapshot = 
icebergSource.getDesc().getRef().getTableSnapshot();
-        if (tableSnapshot != null) {
-            TableSnapshot.VersionType type = tableSnapshot.getType();
-            try {
-                if (type == TableSnapshot.VersionType.VERSION) {
-                    scan = scan.useSnapshot(tableSnapshot.getVersion());
-                } else {
-                    long snapshotId = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
-                    scan = 
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
-                }
-            } catch (IllegalArgumentException e) {
-                throw new UserException(e);
-            }
-        }
-        for (Expression predicate : expressions) {
-            scan = scan.filter(predicate);
-        }
-        List<Split> splits = new ArrayList<>();
-        int formatVersion = ((BaseTable) 
table).operations().current().formatVersion();
-        for (FileScanTask task : scan.planFiles()) {
-            long fileSize = task.file().fileSizeInBytes();
-            for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
-                String dataFilePath = splitTask.file().path().toString();
-                IcebergSplit split = new IcebergSplit(new Path(dataFilePath), 
splitTask.start(),
-                        splitTask.length(), fileSize, new String[0]);
-                split.setFormatVersion(formatVersion);
-                if (formatVersion >= 
IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) {
-                    
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
-                }
-                split.setTableFormatType(TableFormatType.ICEBERG);
-                split.setAnalyzer(analyzer);
-                splits.add(split);
-            }
-        }
-        return splits;
-    }
-
-    public static long getSnapshotIdAsOfTime(List<HistoryEntry> 
historyEntries, long asOfTimestamp) {
-        // find history at or before asOfTimestamp
-        HistoryEntry latestHistory = null;
-        for (HistoryEntry entry : historyEntries) {
-            if (entry.timestampMillis() <= asOfTimestamp) {
-                if (latestHistory == null) {
-                    latestHistory = entry;
-                    continue;
-                }
-                if (entry.timestampMillis() > latestHistory.timestampMillis()) 
{
-                    latestHistory = entry;
-                }
-            }
-        }
-        if (latestHistory == null) {
-            throw new NotFoundException("No version history at or before "
-                + Instant.ofEpochMilli(asOfTimestamp));
-        }
-        return latestHistory.snapshotId();
-    }
-
-    private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask 
spitTask) {
-        List<IcebergDeleteFileFilter> filters = new ArrayList<>();
-        for (DeleteFile delete : spitTask.deletes()) {
-            if (delete.content() == FileContent.POSITION_DELETES) {
-                ByteBuffer lowerBoundBytes = 
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
-                Optional<Long> positionLowerBound = 
Optional.ofNullable(lowerBoundBytes)
-                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
-                ByteBuffer upperBoundBytes = 
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
-                Optional<Long> positionUpperBound = 
Optional.ofNullable(upperBoundBytes)
-                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
-                
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
-                        positionLowerBound.orElse(-1L), 
positionUpperBound.orElse(-1L)));
-            } else if (delete.content() == FileContent.EQUALITY_DELETES) {
-                // todo: 
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
-                // delete.equalityFieldIds()));
-                throw new IllegalStateException("Don't support equality delete 
file");
-            } else {
-                throw new IllegalStateException("Unknown delete content: " + 
delete.content());
-            }
-        }
-        return filters;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index ea079b644e..89189ca970 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -18,7 +18,6 @@
 package org.apache.doris.planner.external;
 
 import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.ImportColumnDesc;
 import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.SlotRef;
@@ -55,7 +54,7 @@ import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 
-public class LoadScanProvider implements FileScanProviderIf {
+public class LoadScanProvider {
 
     private FileGroupInfo fileGroupInfo;
     private TupleDescriptor destTupleDesc;
@@ -65,27 +64,22 @@ public class LoadScanProvider implements FileScanProviderIf 
{
         this.destTupleDesc = destTupleDesc;
     }
 
-    @Override
     public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
         return null;
     }
 
-    @Override
     public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
         return null;
     }
 
-    @Override
     public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
         return null;
     }
 
-    @Override
     public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
         return null;
     }
 
-    @Override
     public FileLoadScanNode.ParamCreateContext createContext(Analyzer 
analyzer) throws UserException {
         FileLoadScanNode.ParamCreateContext ctx = new 
FileLoadScanNode.ParamCreateContext();
         ctx.destTupleDescriptor = destTupleDesc;
@@ -138,7 +132,6 @@ public class LoadScanProvider implements FileScanProviderIf 
{
         return "";
     }
 
-    @Override
     public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext 
context,
                                          FederationBackendPolicy backendPolicy,
                                          List<TScanRangeLocations> 
scanRangeLocations) throws UserException {
@@ -147,18 +140,10 @@ public class LoadScanProvider implements 
FileScanProviderIf {
         fileGroupInfo.createScanRangeLocations(context, backendPolicy, 
scanRangeLocations);
     }
 
-    @Override
-    public void createScanRangeLocations(List<Expr> conjuncts, 
TFileScanRangeParams params,
-                                         FederationBackendPolicy backendPolicy,
-                                         List<TScanRangeLocations> 
scanRangeLocations) throws UserException {
-    }
-
-    @Override
     public int getInputSplitNum() {
         return fileGroupInfo.getFileStatuses().size();
     }
 
-    @Override
     public long getInputFileSize() {
         long res = 0;
         for (TBrokerFileStatus fileStatus : fileGroupInfo.getFileStatuses()) {
@@ -250,7 +235,6 @@ public class LoadScanProvider implements FileScanProviderIf 
{
         }
     }
 
-    @Override
     public TableIf getTargetTable() {
         return fileGroupInfo.getTargetTable();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
deleted file mode 100644
index ebd1831432..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ /dev/null
@@ -1,225 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.FsBroker;
-import org.apache.doris.catalog.HdfsResource;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.BrokerUtil;
-import org.apache.doris.planner.FileLoadScanNode;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
-import org.apache.doris.planner.external.iceberg.IcebergSplit;
-import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.TExternalScanRange;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileScanRange;
-import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.THdfsParams;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TScanRange;
-import org.apache.doris.thrift.TScanRangeLocation;
-import org.apache.doris.thrift.TScanRangeLocations;
-
-import com.google.common.base.Joiner;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-public abstract class QueryScanProvider implements FileScanProviderIf {
-    public static final Logger LOG = 
LogManager.getLogger(QueryScanProvider.class);
-    private int inputSplitNum = 0;
-    private long inputFileSize = 0;
-    protected Splitter splitter;
-
-    public abstract TFileAttributes getFileAttributes() throws UserException;
-
-    @Override
-    public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext 
context,
-                                         FederationBackendPolicy backendPolicy,
-                                         List<TScanRangeLocations> 
scanRangeLocations) throws UserException {
-    }
-
-    @Override
-    public FileLoadScanNode.ParamCreateContext createContext(Analyzer 
analyzer) throws UserException {
-        return null;
-    }
-
-    @Override
-    public void createScanRangeLocations(List<Expr> conjuncts, 
TFileScanRangeParams params,
-                                         FederationBackendPolicy backendPolicy,
-                                         List<TScanRangeLocations> 
scanRangeLocations) throws UserException {
-        long start = System.currentTimeMillis();
-        List<Split> inputSplits = splitter.getSplits(conjuncts);
-        this.inputSplitNum = inputSplits.size();
-        if (inputSplits.isEmpty()) {
-            return;
-        }
-        FileSplit inputSplit = (FileSplit) inputSplits.get(0);
-        TFileType locationType = getLocationType();
-        params.setFileType(locationType);
-        TFileFormatType fileFormatType = getFileFormatType();
-        params.setFormatType(getFileFormatType());
-        if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN || 
fileFormatType == TFileFormatType.FORMAT_JSON) {
-            params.setFileAttributes(getFileAttributes());
-        }
-
-        // set hdfs params for hdfs file type.
-        Map<String, String> locationProperties = getLocationProperties();
-        if (locationType == TFileType.FILE_HDFS || locationType == 
TFileType.FILE_BROKER) {
-            String fsName = "";
-            if (this instanceof TVFScanProvider) {
-                fsName = ((TVFScanProvider) this).getFsName();
-            } else {
-                String fullPath = inputSplit.getPath().toUri().toString();
-                String filePath = inputSplit.getPath().toUri().getPath();
-                // eg:
-                // hdfs://namenode
-                // s3://buckets
-                fsName = fullPath.replace(filePath, "");
-            }
-            THdfsParams tHdfsParams = 
HdfsResource.generateHdfsParam(locationProperties);
-            tHdfsParams.setFsName(fsName);
-            params.setHdfsParams(tHdfsParams);
-
-            if (locationType == TFileType.FILE_BROKER) {
-                FsBroker broker = 
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
-                if (broker == null) {
-                    throw new UserException("No alive broker.");
-                }
-                params.addToBrokerAddresses(new TNetworkAddress(broker.ip, 
broker.port));
-            }
-        } else if (locationType == TFileType.FILE_S3) {
-            params.setProperties(locationProperties);
-        }
-
-        List<String> pathPartitionKeys = getPathPartitionKeys();
-        for (Split split : inputSplits) {
-            TScanRangeLocations curLocations = newLocations(params, 
backendPolicy);
-            FileSplit fileSplit = (FileSplit) split;
-
-            // If fileSplit has partition values, use the values collected 
from hive partitions.
-            // Otherwise, use the values in file path.
-            List<String> partitionValuesFromPath = 
fileSplit.getPartitionValues() == null
-                    ? 
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), 
pathPartitionKeys, false)
-                    : fileSplit.getPartitionValues();
-
-            TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, 
partitionValuesFromPath, pathPartitionKeys);
-            // external data lake table
-            if (fileSplit instanceof IcebergSplit) {
-                IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) 
fileSplit);
-            }
-
-            
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
-            LOG.debug("assign to backend {} with table split: {} ({}, {}), 
location: {}",
-                    curLocations.getLocations().get(0).getBackendId(), 
fileSplit.getPath(), fileSplit.getStart(),
-                    fileSplit.getLength(), 
Joiner.on("|").join(fileSplit.getHosts()));
-            scanRangeLocations.add(curLocations);
-            this.inputFileSize += fileSplit.getLength();
-        }
-        LOG.debug("create #{} ScanRangeLocations cost: {} ms",
-                scanRangeLocations.size(), (System.currentTimeMillis() - 
start));
-    }
-
-    @Override
-    public int getInputSplitNum() {
-        return this.inputSplitNum;
-    }
-
-    @Override
-    public long getInputFileSize() {
-        return this.inputFileSize;
-    }
-
-    private TScanRangeLocations newLocations(TFileScanRangeParams params, 
FederationBackendPolicy backendPolicy) {
-        // Generate on file scan range
-        TFileScanRange fileScanRange = new TFileScanRange();
-        fileScanRange.setParams(params);
-
-        // Scan range
-        TExternalScanRange externalScanRange = new TExternalScanRange();
-        externalScanRange.setFileScanRange(fileScanRange);
-        TScanRange scanRange = new TScanRange();
-        scanRange.setExtScanRange(externalScanRange);
-
-        // Locations
-        TScanRangeLocations locations = new TScanRangeLocations();
-        locations.setScanRange(scanRange);
-
-        TScanRangeLocation location = new TScanRangeLocation();
-        Backend selectedBackend = backendPolicy.getNextBe();
-        location.setBackendId(selectedBackend.getId());
-        location.setServer(new TNetworkAddress(selectedBackend.getIp(), 
selectedBackend.getBePort()));
-        locations.addToLocations(location);
-
-        return locations;
-    }
-
-    private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, 
List<String> columnsFromPath,
-            List<String> columnsFromPathKeys)
-            throws DdlException, MetaNotFoundException {
-        TFileRangeDesc rangeDesc = new TFileRangeDesc();
-        rangeDesc.setStartOffset(fileSplit.getStart());
-        rangeDesc.setSize(fileSplit.getLength());
-        // fileSize only be used when format is orc or parquet and TFileType 
is broker
-        // When TFileType is other type, it is not necessary
-        rangeDesc.setFileSize(fileSplit.getFileLength());
-        rangeDesc.setColumnsFromPath(columnsFromPath);
-        rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
-
-        if (getLocationType() == TFileType.FILE_HDFS) {
-            rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
-        } else if (getLocationType() == TFileType.FILE_S3 || getLocationType() 
== TFileType.FILE_BROKER) {
-            // need full path
-            rangeDesc.setPath(fileSplit.getPath().toString());
-        }
-        return rangeDesc;
-    }
-
-    protected static Optional<TFileType> getTFileType(String location) {
-        if (location != null && !location.isEmpty()) {
-            if (FeConstants.isObjStorage(location)) {
-                return Optional.of(TFileType.FILE_S3);
-            } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
-                return Optional.of(TFileType.FILE_HDFS);
-            } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
-                return Optional.of(TFileType.FILE_LOCAL);
-            } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
-                return Optional.of(TFileType.FILE_BROKER);
-            } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
-                return Optional.of(TFileType.FILE_BROKER);
-            } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
-                return Optional.of(TFileType.FILE_BROKER);
-            }
-        }
-        return Optional.empty();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
new file mode 100644
index 0000000000..a8ede0d843
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.FunctionGenTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class TVFScanNode extends FileQueryScanNode {
+    private static final Logger LOG = LogManager.getLogger(TVFScanNode.class);
+
+    private final ExternalFileTableValuedFunction tableValuedFunction;
+    private final FunctionGenTable table;
+
+    /**
+     * External file scan node for table value function
+     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
+     * eg: s3 tvf
+     * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
+     */
+    public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+        super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, 
needCheckColumnPriv);
+        table = (FunctionGenTable) this.desc.getTable();
+        tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        Preconditions.checkNotNull(desc);
+        computeColumnFilter();
+        initBackendPolicy();
+        initSchemaParams();
+    }
+
+    @Override
+    protected String getFsName(FileSplit split) {
+        return tableValuedFunction.getFsName();
+    }
+
+    @Override
+    public TFileAttributes getFileAttributes() throws UserException {
+        return tableValuedFunction.getFileAttributes();
+    }
+
+    @Override
+    public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
+        return tableValuedFunction.getTFileFormatType();
+    }
+
+    @Override
+    public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
+        return tableValuedFunction.getTFileType();
+    }
+
+    @Override
+    public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
+        return tableValuedFunction.getLocationProperties();
+    }
+
+    @Override
+    public List<String> getPathPartitionKeys() {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public TableIf getTargetTable() {
+        return table;
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        List<Split> splits = Lists.newArrayList();
+        List<TBrokerFileStatus> fileStatuses = 
tableValuedFunction.getFileStatuses();
+        for (TBrokerFileStatus fileStatus : fileStatuses) {
+            Path path = new Path(fileStatus.getPath());
+            try {
+                splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, 
fileStatus.getSize(),
+                        fileStatus.isSplitable, null));
+            } catch (IOException e) {
+                LOG.warn("get file split failed for TVF: {}", path, e);
+                throw new UserException(e);
+            }
+        }
+        return splits;
+    }
+
+    private void addFileSplits(Path path, long fileSize, long splitSize, 
List<Split> splits) {
+        long bytesRemaining;
+        for (bytesRemaining = fileSize; (double) bytesRemaining / (double) 
splitSize > 1.1D;
+                bytesRemaining -= splitSize) {
+            splits.add(new FileSplit(path, fileSize - bytesRemaining, 
splitSize, fileSize, new String[0], null));
+        }
+        if (bytesRemaining != 0L) {
+            splits.add(new FileSplit(path, fileSize - bytesRemaining, 
bytesRemaining, fileSize, new String[0], null));
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
deleted file mode 100644
index e4941ffa5a..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.FunctionGenTable;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-import java.util.Map;
-
-public class TVFScanProvider extends QueryScanProvider {
-    private FunctionGenTable tvfTable;
-    private final TupleDescriptor desc;
-    private ExternalFileTableValuedFunction tableValuedFunction;
-
-    public TVFScanProvider(FunctionGenTable tvfTable, TupleDescriptor desc,
-                            ExternalFileTableValuedFunction 
tableValuedFunction) {
-        this.tvfTable = tvfTable;
-        this.desc = desc;
-        this.tableValuedFunction = tableValuedFunction;
-        this.splitter = new TVFSplitter(tableValuedFunction);
-    }
-
-    public String getFsName() {
-        return tableValuedFunction.getFsName();
-    }
-
-    // =========== implement abstract methods of QueryScanProvider 
=================
-    @Override
-    public TFileAttributes getFileAttributes() throws UserException {
-        return tableValuedFunction.getFileAttributes();
-    }
-
-
-    // =========== implement interface methods of FileScanProviderIf 
================
-    @Override
-    public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
-        return tableValuedFunction.getTFileFormatType();
-    }
-
-    @Override
-    public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
-        return tableValuedFunction.getTFileType();
-    }
-
-    @Override
-    public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
-        return tableValuedFunction.getLocationProperties();
-    }
-
-    @Override
-    public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
-        return Lists.newArrayList();
-    }
-
-    @Override
-    public TableIf getTargetTable() {
-        return tvfTable;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
deleted file mode 100644
index e2f5e556aa..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
-import org.apache.doris.thrift.TBrokerFileStatus;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-
-public class TVFSplitter implements Splitter {
-
-    private static final Logger LOG = LogManager.getLogger(TVFSplitter.class);
-
-    private ExternalFileTableValuedFunction tableValuedFunction;
-
-    public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) {
-        this.tableValuedFunction = tableValuedFunction;
-    }
-
-    @Override
-    public List<Split> getSplits(List<Expr> exprs) throws UserException {
-        List<Split> splits = Lists.newArrayList();
-        List<TBrokerFileStatus> fileStatuses = 
tableValuedFunction.getFileStatuses();
-        for (TBrokerFileStatus fileStatus : fileStatuses) {
-            long fileLength = fileStatus.getSize();
-            Path path = new Path(fileStatus.getPath());
-            if (fileStatus.isSplitable) {
-                long splitSize = 
ConnectContext.get().getSessionVariable().getFileSplitSize();
-                if (splitSize <= 0) {
-                    splitSize = fileStatus.getBlockSize();
-                }
-                // Min split size is DEFAULT_SPLIT_SIZE(128MB).
-                splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize : 
DEFAULT_SPLIT_SIZE;
-                addFileSplits(path, fileLength, splitSize, splits);
-            } else {
-                Split split = new FileSplit(path, 0, fileLength, fileLength, 
new String[0], null);
-                splits.add(split);
-            }
-        }
-        return splits;
-    }
-
-    private void addFileSplits(Path path, long fileSize, long splitSize, 
List<Split> splits) {
-        long bytesRemaining;
-        for (bytesRemaining = fileSize; (double) bytesRemaining / (double) 
splitSize > 1.1D;
-                bytesRemaining -= splitSize) {
-            splits.add(new FileSplit(path, fileSize - bytesRemaining, 
splitSize, fileSize, new String[0], null));
-        }
-        if (bytesRemaining != 0L) {
-            splits.add(new FileSplit(path, fileSize - bytesRemaining, 
bytesRemaining, fileSize, new String[0], null));
-        }
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
index 87760f77e1..0fc0f39dff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
@@ -26,8 +26,9 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.external.HiveScanProvider;
+import org.apache.doris.planner.external.HiveScanNode;
 import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
 
 import org.apache.iceberg.TableProperties;
 
@@ -36,15 +37,14 @@ import java.util.Map;
 public class IcebergHMSSource implements IcebergSource {
 
     private final HMSExternalTable hmsTable;
-    private final HiveScanProvider hiveScanProvider;
-
     private final TupleDescriptor desc;
+    private final Map<String, ColumnRange> columnNameToRange;
 
     public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
                             Map<String, ColumnRange> columnNameToRange) {
-        this.hiveScanProvider = new HiveScanProvider(hmsTable, desc, 
columnNameToRange);
         this.hmsTable = hmsTable;
         this.desc = desc;
+        this.columnNameToRange = columnNameToRange;
     }
 
     @Override
@@ -54,8 +54,8 @@ public class IcebergHMSSource implements IcebergSource {
 
     @Override
     public String getFileFormat() throws DdlException, MetaNotFoundException {
-        return hiveScanProvider.getRemoteHiveTable().getParameters()
-                .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+        return hmsTable.getRemoteTable().getParameters()
+            .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, 
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
     }
 
     public org.apache.iceberg.Table getIcebergTable() throws 
MetaNotFoundException {
@@ -64,12 +64,19 @@ public class IcebergHMSSource implements IcebergSource {
 
     @Override
     public TableIf getTargetTable() {
-        return hiveScanProvider.getTargetTable();
+        return hmsTable;
     }
 
     @Override
     public TFileAttributes getFileAttributes() throws UserException {
-        return hiveScanProvider.getFileAttributes();
+        TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+        
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
+                .getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER, 
HiveScanNode.DEFAULT_FIELD_DELIMITER));
+        textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER);
+        TFileAttributes fileAttributes = new TFileAttributes();
+        fileAttributes.setTextParams(textParams);
+        fileAttributes.setHeaderType("");
+        return fileAttributes;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
new file mode 100644
index 0000000000..f19a7a43de
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TableSnapshot;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TIcebergDeleteFileDesc;
+import org.apache.doris.thrift.TIcebergFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Conversions;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+public class IcebergScanNode extends FileQueryScanNode {
+
+    public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+
+    private IcebergSource source;
+
+    /**
+     * External file scan node for Query iceberg table
+     * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
+     * eg: s3 tvf
+     * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
+     */
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv);
+    }
+
+    @Override
+    protected void doInitialize() throws UserException {
+        ExternalTable table = (ExternalTable) desc.getTable();
+        if (table.isView()) {
+            throw new AnalysisException(
+                String.format("Querying external view '%s.%s' is not 
supported", table.getDbName(), table.getName()));
+        }
+        computeColumnFilter();
+        initBackendPolicy();
+        if (table instanceof HMSExternalTable) {
+            source = new IcebergHMSSource((HMSExternalTable) table, desc, 
columnNameToRange);
+        } else if (table instanceof IcebergExternalTable) {
+            String catalogType = ((IcebergExternalTable) 
table).getIcebergCatalogType();
+            switch (catalogType) {
+                case IcebergExternalCatalog.ICEBERG_HMS:
+                case IcebergExternalCatalog.ICEBERG_REST:
+                case IcebergExternalCatalog.ICEBERG_DLF:
+                case IcebergExternalCatalog.ICEBERG_GLUE:
+                    source = new IcebergApiSource((IcebergExternalTable) 
table, desc, columnNameToRange);
+                    break;
+                default:
+                    throw new UserException("Unknown iceberg catalog type: " + 
catalogType);
+            }
+        }
+        Preconditions.checkNotNull(source);
+        initSchemaParams();
+    }
+
+    public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit 
icebergSplit) {
+        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+        
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+        TIcebergFileDesc fileDesc = new TIcebergFileDesc();
+        int formatVersion = icebergSplit.getFormatVersion();
+        fileDesc.setFormatVersion(formatVersion);
+        if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
+            fileDesc.setContent(FileContent.DATA.id());
+        } else {
+            for (IcebergDeleteFileFilter filter : 
icebergSplit.getDeleteFileFilters()) {
+                TIcebergDeleteFileDesc deleteFileDesc = new 
TIcebergDeleteFileDesc();
+                deleteFileDesc.setPath(filter.getDeleteFilePath());
+                if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
+                    fileDesc.setContent(FileContent.POSITION_DELETES.id());
+                    IcebergDeleteFileFilter.PositionDelete positionDelete =
+                            (IcebergDeleteFileFilter.PositionDelete) filter;
+                    OptionalLong lowerBound = 
positionDelete.getPositionLowerBound();
+                    OptionalLong upperBound = 
positionDelete.getPositionUpperBound();
+                    if (lowerBound.isPresent()) {
+                        
deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong());
+                    }
+                    if (upperBound.isPresent()) {
+                        
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
+                    }
+                } else {
+                    fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
+                    IcebergDeleteFileFilter.EqualityDelete equalityDelete =
+                            (IcebergDeleteFileFilter.EqualityDelete) filter;
+                    deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
+                }
+                fileDesc.addToDeleteFiles(deleteFileDesc);
+            }
+        }
+        tableFormatFileDesc.setIcebergParams(fileDesc);
+        rangeDesc.setTableFormatParams(tableFormatFileDesc);
+    }
+
+    @Override
+    public List<Split> getSplits() throws UserException {
+        List<Expression> expressions = new ArrayList<>();
+        org.apache.iceberg.Table table = source.getIcebergTable();
+        for (Expr conjunct : conjuncts) {
+            Expression expression = 
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
+            if (expression != null) {
+                expressions.add(expression);
+            }
+        }
+        TableScan scan = table.newScan();
+        TableSnapshot tableSnapshot = 
source.getDesc().getRef().getTableSnapshot();
+        if (tableSnapshot != null) {
+            TableSnapshot.VersionType type = tableSnapshot.getType();
+            try {
+                if (type == TableSnapshot.VersionType.VERSION) {
+                    scan = scan.useSnapshot(tableSnapshot.getVersion());
+                } else {
+                    long snapshotId = 
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+                    scan = 
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
+                }
+            } catch (IllegalArgumentException e) {
+                throw new UserException(e);
+            }
+        }
+        for (Expression predicate : expressions) {
+            scan = scan.filter(predicate);
+        }
+        List<Split> splits = new ArrayList<>();
+        int formatVersion = ((BaseTable) 
table).operations().current().formatVersion();
+        // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+        long splitSize = 
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), 
DEFAULT_SPLIT_SIZE);
+        for (FileScanTask task : scan.planFiles()) {
+            long fileSize = task.file().fileSizeInBytes();
+            for (FileScanTask splitTask : task.split(splitSize)) {
+                String dataFilePath = splitTask.file().path().toString();
+                IcebergSplit split = new IcebergSplit(new Path(dataFilePath), 
splitTask.start(),
+                        splitTask.length(), fileSize, new String[0]);
+                split.setFormatVersion(formatVersion);
+                if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
+                    
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
+                }
+                split.setTableFormatType(TableFormatType.ICEBERG);
+                splits.add(split);
+            }
+        }
+        return splits;
+    }
+
+    private long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long 
asOfTimestamp) {
+        // find history at or before asOfTimestamp
+        HistoryEntry latestHistory = null;
+        for (HistoryEntry entry : historyEntries) {
+            if (entry.timestampMillis() <= asOfTimestamp) {
+                if (latestHistory == null) {
+                    latestHistory = entry;
+                    continue;
+                }
+                if (entry.timestampMillis() > latestHistory.timestampMillis()) 
{
+                    latestHistory = entry;
+                }
+            }
+        }
+        if (latestHistory == null) {
+            throw new NotFoundException("No version history at or before "
+                + Instant.ofEpochMilli(asOfTimestamp));
+        }
+        return latestHistory.snapshotId();
+    }
+
+    private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask 
spitTask) {
+        List<IcebergDeleteFileFilter> filters = new ArrayList<>();
+        for (DeleteFile delete : spitTask.deletes()) {
+            if (delete.content() == FileContent.POSITION_DELETES) {
+                ByteBuffer lowerBoundBytes = 
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+                Optional<Long> positionLowerBound = 
Optional.ofNullable(lowerBoundBytes)
+                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+                ByteBuffer upperBoundBytes = 
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+                Optional<Long> positionUpperBound = 
Optional.ofNullable(upperBoundBytes)
+                        .map(bytes -> 
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+                
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
+                        positionLowerBound.orElse(-1L), 
positionUpperBound.orElse(-1L)));
+            } else if (delete.content() == FileContent.EQUALITY_DELETES) {
+                // todo: 
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
+                throw new IllegalStateException("Don't support equality delete 
file");
+            } else {
+                throw new IllegalStateException("Unknown delete content: " + 
delete.content());
+            }
+        }
+        return filters;
+    }
+
+    @Override
+    public TFileType getLocationType() throws UserException {
+        Table icebergTable = source.getIcebergTable();
+        String location = icebergTable.location();
+        return getTFileType(location).orElseThrow(() ->
+            new DdlException("Unknown file location " + location + " for 
iceberg table " + icebergTable.name()));
+    }
+
+    @Override
+    public TFileFormatType getFileFormatType() throws UserException {
+        TFileFormatType type;
+        String icebergFormat = source.getFileFormat();
+        if (icebergFormat.equalsIgnoreCase("parquet")) {
+            type = TFileFormatType.FORMAT_PARQUET;
+        } else if (icebergFormat.equalsIgnoreCase("orc")) {
+            type = TFileFormatType.FORMAT_ORC;
+        } else {
+            throw new DdlException(String.format("Unsupported format name: %s 
for iceberg table.", icebergFormat));
+        }
+        return type;
+    }
+
+    @Override
+    public TFileAttributes getFileAttributes() throws UserException {
+        return source.getFileAttributes();
+    }
+
+    @Override
+    public List<String> getPathPartitionKeys() throws UserException {
+        return 
source.getIcebergTable().spec().fields().stream().map(PartitionField::name)
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public TableIf getTargetTable() {
+        return source.getTargetTable();
+    }
+
+    @Override
+    public Map<String, String> getLocationProperties() throws UserException {
+        return source.getCatalog().getProperties();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
deleted file mode 100644
index d0bd72f5c7..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external.iceberg;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.external.IcebergSplitter;
-import org.apache.doris.planner.external.QueryScanProvider;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TIcebergDeleteFileDesc;
-import org.apache.doris.thrift.TIcebergFileDesc;
-import org.apache.doris.thrift.TTableFormatFileDesc;
-
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.PartitionField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.OptionalLong;
-import java.util.stream.Collectors;
-
-/**
- * A file scan provider for iceberg.
- */
-public class IcebergScanProvider extends QueryScanProvider {
-
-    public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
-    private final Analyzer analyzer;
-    private final IcebergSource icebergSource;
-
-    public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer) 
{
-        this.icebergSource = icebergSource;
-        this.analyzer = analyzer;
-        this.splitter = new IcebergSplitter(icebergSource, analyzer);
-    }
-
-    public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit 
icebergSplit) {
-        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
-        
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
-        TIcebergFileDesc fileDesc = new TIcebergFileDesc();
-        int formatVersion = icebergSplit.getFormatVersion();
-        fileDesc.setFormatVersion(formatVersion);
-        if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
-            fileDesc.setContent(FileContent.DATA.id());
-        } else {
-            for (IcebergDeleteFileFilter filter : 
icebergSplit.getDeleteFileFilters()) {
-                TIcebergDeleteFileDesc deleteFileDesc = new 
TIcebergDeleteFileDesc();
-                deleteFileDesc.setPath(filter.getDeleteFilePath());
-                if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
-                    fileDesc.setContent(FileContent.POSITION_DELETES.id());
-                    IcebergDeleteFileFilter.PositionDelete positionDelete =
-                            (IcebergDeleteFileFilter.PositionDelete) filter;
-                    OptionalLong lowerBound = 
positionDelete.getPositionLowerBound();
-                    OptionalLong upperBound = 
positionDelete.getPositionUpperBound();
-                    if (lowerBound.isPresent()) {
-                        
deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong());
-                    }
-                    if (upperBound.isPresent()) {
-                        
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
-                    }
-                } else {
-                    fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
-                    IcebergDeleteFileFilter.EqualityDelete equalityDelete =
-                            (IcebergDeleteFileFilter.EqualityDelete) filter;
-                    deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
-                }
-                fileDesc.addToDeleteFiles(deleteFileDesc);
-            }
-        }
-        tableFormatFileDesc.setIcebergParams(fileDesc);
-        rangeDesc.setTableFormatParams(tableFormatFileDesc);
-    }
-
-    @Override
-    public TFileType getLocationType() throws DdlException, 
MetaNotFoundException {
-        org.apache.iceberg.Table table = icebergSource.getIcebergTable();
-        String location = table.location();
-        return getTFileType(location).orElseThrow(() ->
-                    new DdlException("Unknown file location " + location + " 
for iceberg table " + table.name()));
-    }
-
-    @Override
-    public List<String> getPathPartitionKeys() throws DdlException, 
MetaNotFoundException {
-        return 
icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public TFileFormatType getFileFormatType() throws DdlException, 
MetaNotFoundException {
-        TFileFormatType type;
-        String icebergFormat = icebergSource.getFileFormat();
-        if (icebergFormat.equalsIgnoreCase("parquet")) {
-            type = TFileFormatType.FORMAT_PARQUET;
-        } else if (icebergFormat.equalsIgnoreCase("orc")) {
-            type = TFileFormatType.FORMAT_ORC;
-        } else {
-            throw new DdlException(String.format("Unsupported format name: %s 
for iceberg table.", icebergFormat));
-        }
-        return type;
-    }
-
-    @Override
-    public Map<String, String> getLocationProperties() throws 
MetaNotFoundException, DdlException {
-        return icebergSource.getCatalog().getProperties();
-    }
-
-    @Override
-    public TableIf getTargetTable() {
-        return icebergSource.getTargetTable();
-    }
-
-    @Override
-    public TFileAttributes getFileAttributes() throws UserException {
-        return icebergSource.getFileAttributes();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index e840c9a876..896b4968b4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.planner.external.iceberg;
 
-import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.planner.external.FileSplit;
 
 import lombok.Data;
@@ -31,8 +30,6 @@ public class IcebergSplit extends FileSplit {
         super(file, start, length, fileLength, hosts, null);
     }
 
-    private Analyzer analyzer;
-    private String dataFilePath;
     private Integer formatVersion;
     private List<IcebergDeleteFileFilter> deleteFileFilters;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java 
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
similarity index 79%
rename from fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
rename to fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index 63b837aacc..31b1e1515a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -15,17 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.planner;
+package org.apache.doris.spi;
 
-import lombok.Data;
+/**
+ * Split interface. e.g. Tablet for Olap Table.
+ */
+public interface Split {
 
-@Data
-public abstract class Split {
-    protected String[] hosts;
+    String[] getHosts();
 
-    public Split() {}
+    Object getInfo();
 
-    public Split(String[] hosts) {
-        this.hosts = hosts;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 586c9139cc..c39be3cf2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -31,6 +31,8 @@ public enum StatisticalType {
     HASH_JOIN_NODE,
     HIVE_SCAN_NODE,
     ICEBERG_SCAN_NODE,
+    HUDI_SCAN_NODE,
+    TVF_SCAN_NODE,
     INTERSECT_NODE,
     LOAD_SCAN_NODE,
     MYSQL_SCAN_NODE,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index bde54ab4b0..48e8307799 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -37,7 +37,7 @@ import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
-import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TVFScanNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
 import org.apache.doris.proto.Types.PScalarType;
@@ -317,7 +317,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
-        return new FileQueryScanNode(id, desc, false);
+        return new TVFScanNode(id, desc, false);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to