This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5210c04241 [Refactor](ScanNode) Split interface refactor (#19133)
5210c04241 is described below
commit 5210c04241260903b13de84b8885cfaceabe573e
Author: Jibing-Li <[email protected]>
AuthorDate: Fri May 5 23:20:29 2023 +0800
[Refactor](ScanNode) Split interface refactor (#19133)
Move getSplits function to ScanNode, remove Splitter interface.
For each kind of data source, create a specific ScanNode and implement the
getSplits interface. For example, HiveScanNode.
Remove FileScanProviderIf move the code to each ScanNode.
---
.../apache/doris/datasource/ExternalCatalog.java | 10 +
.../doris/datasource/hive/HiveMetaStoreCache.java | 23 +-
.../glue/translator/PhysicalPlanTranslator.java | 38 ++-
.../org/apache/doris/planner/FileLoadScanNode.java | 9 +-
.../org/apache/doris/planner/OlapSplitter.java | 31 --
.../java/org/apache/doris/planner/ScanNode.java | 6 +
.../apache/doris/planner/SingleNodePlanner.java | 25 +-
.../java/org/apache/doris/planner/Splitter.java | 29 --
.../doris/planner/external/FileQueryScanNode.java | 328 +++++++++++++--------
.../doris/planner/external/FileScanNode.java | 70 ++++-
.../doris/planner/external/FileScanProviderIf.java | 61 ----
.../apache/doris/planner/external/FileSplit.java | 18 +-
.../planner/external/HMSTableScanProvider.java | 34 ---
.../{HiveSplitter.java => HiveScanNode.java} | 205 +++++++------
.../doris/planner/external/HiveScanProvider.java | 139 ---------
.../{HudiScanProvider.java => HudiScanNode.java} | 29 +-
.../doris/planner/external/IcebergSplitter.java | 155 ----------
.../doris/planner/external/LoadScanProvider.java | 18 +-
.../doris/planner/external/QueryScanProvider.java | 225 --------------
.../apache/doris/planner/external/TVFScanNode.java | 133 +++++++++
.../doris/planner/external/TVFScanProvider.java | 85 ------
.../apache/doris/planner/external/TVFSplitter.java | 79 -----
.../planner/external/iceberg/IcebergHMSSource.java | 23 +-
.../planner/external/iceberg/IcebergScanNode.java | 286 ++++++++++++++++++
.../external/iceberg/IcebergScanProvider.java | 137 ---------
.../planner/external/iceberg/IcebergSplit.java | 3 -
.../org/apache/doris/{planner => spi}/Split.java | 16 +-
.../apache/doris/statistics/StatisticalType.java | 2 +
.../ExternalFileTableValuedFunction.java | 4 +-
29 files changed, 937 insertions(+), 1284 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 1c5c4b4944..adcb109ace 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -473,4 +473,14 @@ public abstract class ExternalCatalog implements
CatalogIf<ExternalDatabase>, Wr
}
return specifiedDatabaseMap;
}
+
+ public boolean useSelfSplitter() {
+ Map<String, String> properties = catalogProperty.getProperties();
+ boolean ret = true;
+ if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
+ &&
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false"))
{
+ ret = false;
+ }
+ return ret;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 2df1ed1225..435272782c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -26,10 +26,14 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.external.hive.util.HiveUtil;
+import org.apache.doris.fs.FileSystemFactory;
+import org.apache.doris.fs.RemoteFiles;
import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
@@ -37,9 +41,8 @@ import org.apache.doris.metric.MetricRepo;
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
-import org.apache.doris.planner.Split;
import org.apache.doris.planner.external.FileSplit;
-import org.apache.doris.planner.external.HiveSplitter;
+import org.apache.doris.spi.Split;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -263,6 +266,19 @@ public class HiveMetaStoreCache {
return new HivePartition(key.dbName, key.tblName, false,
sd.getInputFormat(), sd.getLocation(), key.values);
}
+ // Get File Status by using FileSystem API.
+ private FileCacheValue getFileCache(String location, InputFormat<?, ?>
inputFormat,
+ JobConf
jobConf,
+ List<String>
partitionValues) throws UserException {
+ FileCacheValue result = new FileCacheValue();
+ result.setSplittable(HiveUtil.isSplittable(inputFormat, new
Path(location), jobConf));
+ RemoteFileSystem fs = FileSystemFactory.getByLocation(location,
jobConf);
+ RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
+ locatedFiles.locations().forEach(result::addFile);
+ result.setPartitionValues(partitionValues);
+ return result;
+ }
+
private FileCacheValue loadFiles(FileCacheKey key) {
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
try {
@@ -284,8 +300,7 @@ public class HiveMetaStoreCache {
InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
// TODO: This is a temp config, will remove it after the
HiveSplitter is stable.
if (key.useSelfSplitter) {
- result = HiveSplitter.getFileCache(finalLocation,
inputFormat,
- jobConf, key.getPartitionValues());
+ result = getFileCache(finalLocation, inputFormat, jobConf,
key.getPartitionValues());
} else {
InputSplit[] splits;
String remoteUser =
jobConf.get(HdfsResource.HADOOP_USER_NAME);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index f27abc0f69..acb3ebe87b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -44,6 +44,8 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
@@ -139,7 +141,9 @@ import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
-import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.HiveScanNode;
+import org.apache.doris.planner.external.HudiScanNode;
+import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPushAggOp;
@@ -600,24 +604,44 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
ExternalTable table = fileScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, table,
context);
tupleDescriptor.setTable(table);
+
// TODO(cmy): determine the needCheckColumnPriv param
- FileQueryScanNode fileScanNode = new
FileQueryScanNode(context.nextPlanNodeId(), tupleDescriptor, false);
+ ScanNode scanNode = null;
+ if (table instanceof HMSExternalTable) {
+ switch (((HMSExternalTable) table).getDlaType()) {
+ case HUDI:
+ scanNode = new HudiScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ break;
+ case ICEBERG:
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ break;
+ case HIVE:
+ scanNode = new HiveScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ break;
+ default:
+ break;
+ }
+ } else if (table instanceof IcebergExternalTable) {
+ scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ }
+ Preconditions.checkNotNull(scanNode);
TableName tableName = new TableName(null, "", "");
TableRef ref = new TableRef(tableName, null, null);
BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
tupleDescriptor.setRef(tableRef);
- Utils.execWithUncheckedException(fileScanNode::init);
- context.addScanNode(fileScanNode);
+ Utils.execWithUncheckedException(scanNode::init);
+ context.addScanNode(scanNode);
+ ScanNode finalScanNode = scanNode;
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator ->
runtimeFilterGenerator.getTargetOnScanNode(fileScan.getId()).forEach(
- expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, fileScanNode, context)
+ expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode,
context)
)
);
- Utils.execWithUncheckedException(fileScanNode::finalizeForNereids);
+ Utils.execWithUncheckedException(scanNode::finalizeForNereids);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
- PlanFragment planFragment = createPlanFragment(fileScanNode,
dataPartition, fileScan);
+ PlanFragment planFragment = createPlanFragment(scanNode,
dataPartition, fileScan);
context.addPlanFragment(planFragment);
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
return planFragment;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index 07d1ff445f..9215d409ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -42,7 +42,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.external.FileGroupInfo;
import org.apache.doris.planner.external.FileScanNode;
-import org.apache.doris.planner.external.FileScanProviderIf;
import org.apache.doris.planner.external.LoadScanProvider;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.statistics.StatisticalType;
@@ -82,7 +81,7 @@ public class FileLoadScanNode extends FileScanNode {
// Each DataDescription in a load stmt conreponding to a FileGroupInfo in
this list.
private final List<FileGroupInfo> fileGroupInfos = Lists.newArrayList();
// For load, the num of providers equals to the num of file group infos.
- private final List<FileScanProviderIf> scanProviders =
Lists.newArrayList();
+ private final List<LoadScanProvider> scanProviders = Lists.newArrayList();
// For load, the num of ParamCreateContext equals to the num of file group
infos.
private final List<ParamCreateContext> contexts = Lists.newArrayList();
@@ -128,13 +127,13 @@ public class FileLoadScanNode extends FileScanNode {
// For each scan provider, create a corresponding ParamCreateContext
private void initParamCreateContexts(Analyzer analyzer) throws
UserException {
- for (FileScanProviderIf scanProvider : scanProviders) {
+ for (LoadScanProvider scanProvider : scanProviders) {
ParamCreateContext context = scanProvider.createContext(analyzer);
// set where and preceding filter.
// FIXME(cmy): we should support set different expr for different
file group.
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(),
context.srcTupleDescriptor, analyzer);
initAndSetWhereExpr(context.fileGroup.getWhereExpr(),
context.destTupleDescriptor, analyzer);
- setDefaultValueExprs(scanProvider, context.srcSlotDescByName,
context.params, true);
+ setDefaultValueExprs(scanProvider.getTargetTable(),
context.srcSlotDescByName, context.params, true);
this.contexts.add(context);
}
}
@@ -196,7 +195,7 @@ public class FileLoadScanNode extends FileScanNode {
contexts.size() + " vs. " + scanProviders.size());
for (int i = 0; i < contexts.size(); ++i) {
FileLoadScanNode.ParamCreateContext context = contexts.get(i);
- FileScanProviderIf scanProvider = scanProviders.get(i);
+ LoadScanProvider scanProvider = scanProviders.get(i);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
deleted file mode 100644
index f2d06a3aef..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
+++ /dev/null
@@ -1,31 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.UserException;
-
-import java.util.List;
-
-public class OlapSplitter implements Splitter {
-
- @Override
- public List<Split> getSplits(List<Expr> exprs) throws UserException {
- return null;
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 13dd8638ba..6c4997ea77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -37,8 +37,10 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocations;
@@ -104,6 +106,10 @@ public abstract class ScanNode extends PlanNode {
sortColumn = column;
}
+ protected List<Split> getSplits() throws UserException {
+ throw new NotImplementedException("Scan node sub class need to
implement getSplits interface.");
+ }
+
/**
* cast expr to SlotDescriptor type
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 55df5b6f54..92e2b94148 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -60,6 +60,8 @@ import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
@@ -67,6 +69,9 @@ import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.HiveScanNode;
+import org.apache.doris.planner.external.HudiScanNode;
+import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.thrift.TNullSide;
@@ -1986,8 +1991,7 @@ public class SingleNodePlanner {
case HIVE:
throw new RuntimeException("Hive external table is not
supported, try to use hive catalog please");
case ICEBERG:
- scanNode = new FileQueryScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
- break;
+ throw new RuntimeException("Iceberg external table is not
supported, use iceberg catalog please");
case HUDI:
throw new UserException(
"Hudi table is no longer supported. Use Multi Catalog
feature to connect to Hudi");
@@ -1998,8 +2002,23 @@ public class SingleNodePlanner {
scanNode = ((TableValuedFunctionRef)
tblRef).getScanNode(ctx.getNextNodeId());
break;
case HMS_EXTERNAL_TABLE:
+ TableIf table = tblRef.getDesc().getTable();
+ switch (((HMSExternalTable) table).getDlaType()) {
+ case HUDI:
+ scanNode = new HudiScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ break;
+ case ICEBERG:
+ scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ break;
+ case HIVE:
+ scanNode = new HiveScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ break;
+ default:
+ throw new UserException("Not supported table type" +
table.getType());
+ }
+ break;
case ICEBERG_EXTERNAL_TABLE:
- scanNode = new FileQueryScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
break;
case ES_EXTERNAL_TABLE:
scanNode = new EsScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), "EsScanNode", true);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
deleted file mode 100644
index 5ad1034e61..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.UserException;
-
-import java.util.List;
-
-public interface Splitter {
- static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
-
- List<Split> getSplits(List<Expr> exprs) throws UserException;
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 48f35eb363..d5927a1520 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -22,24 +22,38 @@ import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.FunctionGenTable;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.external.iceberg.IcebergApiSource;
-import org.apache.doris.planner.external.iceberg.IcebergHMSSource;
-import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
-import org.apache.doris.planner.external.iceberg.IcebergSource;
+import org.apache.doris.planner.external.iceberg.IcebergScanNode;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
+import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TExternalScanRange;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.THdfsParams;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -48,20 +62,21 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
/**
* FileQueryScanNode for querying the file access type of catalog, now only
support
- * hive,hudi and iceberg.
+ * hive,hudi, iceberg and TVF.
*/
-public class FileQueryScanNode extends FileScanNode {
+public abstract class FileQueryScanNode extends FileScanNode {
private static final Logger LOG =
LogManager.getLogger(FileQueryScanNode.class);
- // For query, there is only one FileScanProvider.
- private FileScanProviderIf scanProvider;
+ protected Map<String, SlotDescriptor> destSlotDescByName;
+ protected TFileScanRangeParams params;
- private Map<String, SlotDescriptor> destSlotDescByName;
- private TFileScanRangeParams params;
+ protected int inputSplitNum = 0;
+ protected long inputFileSize = 0;
/**
* External file scan node for Query hms table
@@ -69,8 +84,9 @@ public class FileQueryScanNode extends FileScanNode {
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
- super(id, desc, "FILE_QUERY_SCAN_NODE",
StatisticalType.FILE_SCAN_NODE, needCheckColumnPriv);
+ public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ StatisticalType statisticalType, boolean
needCheckColumnPriv) {
+ super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
}
@Override
@@ -87,36 +103,27 @@ public class FileQueryScanNode extends FileScanNode {
}
// Init scan provider and schema related params.
- private void doInitialize() throws UserException {
+ protected void doInitialize() throws UserException {
Preconditions.checkNotNull(desc);
+ ExternalTable table = (ExternalTable) desc.getTable();
+ if (table.isView()) {
+ throw new AnalysisException(
+ String.format("Querying external view '%s.%s' is not
supported", table.getDbName(), table.getName()));
+ }
computeColumnFilter();
- initScanProvider();
initBackendPolicy();
initSchemaParams();
}
- private void initScanProvider() throws UserException {
- if (this.desc.getTable() instanceof HMSExternalTable) {
- HMSExternalTable hmsTable = (HMSExternalTable)
this.desc.getTable();
- initHMSTableScanProvider(hmsTable);
- } else if (this.desc.getTable() instanceof FunctionGenTable) {
- FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
- initTVFScanProvider(table, (ExternalFileTableValuedFunction)
table.getTvf());
- } else if (this.desc.getTable() instanceof IcebergExternalTable) {
- IcebergExternalTable table = (IcebergExternalTable)
this.desc.getTable();
- initIcebergScanProvider(table);
- }
- }
-
// Init schema (Tuple/Slot) related params.
- private void initSchemaParams() throws UserException {
+ protected void initSchemaParams() throws UserException {
destSlotDescByName = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
destSlotDescByName.put(slot.getColumn().getName(), slot);
}
params = new TFileScanRangeParams();
params.setDestTupleId(desc.getId().asInt());
- List<String> partitionKeys = scanProvider.getPathPartitionKeys();
+ List<String> partitionKeys = getPathPartitionKeys();
List<Column> columns = desc.getTable().getBaseSchema(false);
params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
for (SlotDescriptor slot : desc.getSlots()) {
@@ -128,20 +135,13 @@ public class FileQueryScanNode extends FileScanNode {
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
params.addToRequiredSlots(slotInfo);
}
- setDefaultValueExprs(scanProvider, destSlotDescByName, params, false);
+ setDefaultValueExprs(getTargetTable(), destSlotDescByName, params,
false);
setColumnPositionMappingForTextFile();
// For query, set src tuple id to -1.
params.setSrcTupleId(-1);
- TableIf table = desc.getTable();
- // Slot to schema id map is used for supporting hive 1.x orc internal
column name (col0, col1, col2...)
- if (table instanceof HMSExternalTable) {
- if (((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
- genSlotToSchemaIdMap();
- }
- }
}
- private void initBackendPolicy() throws UserException {
+ protected void initBackendPolicy() throws UserException {
backendPolicy.init();
numNodes = backendPolicy.numBackends();
}
@@ -161,74 +161,11 @@ public class FileQueryScanNode extends FileScanNode {
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
slotInfo.setSlotId(slot.getId().asInt());
-
slotInfo.setIsFileSlot(!scanProvider.getPathPartitionKeys().contains(slot.getColumn().getName()));
+
slotInfo.setIsFileSlot(!getPathPartitionKeys().contains(slot.getColumn().getName()));
params.addToRequiredSlots(slotInfo);
}
}
- private void initHMSTableScanProvider(HMSExternalTable hmsTable) throws
UserException {
- Preconditions.checkNotNull(hmsTable);
-
- if (hmsTable.isView()) {
- throw new AnalysisException(
- String.format("Querying external view '[%s].%s.%s' is not
supported", hmsTable.getDlaType(),
- hmsTable.getDbName(), hmsTable.getName()));
- }
-
- switch (hmsTable.getDlaType()) {
- case HUDI:
- scanProvider = new HudiScanProvider(hmsTable, desc,
columnNameToRange);
- break;
- case ICEBERG:
- IcebergSource hmsSource = new IcebergHMSSource(hmsTable, desc,
columnNameToRange);
- scanProvider = new IcebergScanProvider(hmsSource, analyzer);
- break;
- case HIVE:
- String inputFormat =
hmsTable.getRemoteTable().getSd().getInputFormat();
- if (inputFormat.contains("TextInputFormat")) {
- for (SlotDescriptor slot : desc.getSlots()) {
- if (!slot.getType().isScalarType()) {
- throw new UserException("For column `" +
slot.getColumn().getName()
- + "`, The column types ARRAY/MAP/STRUCT
are not supported yet"
- + " for text input format of Hive. ");
- }
- }
- }
- scanProvider = new HiveScanProvider(hmsTable, desc,
columnNameToRange);
- break;
- default:
- throw new UserException("Unknown table type: " +
hmsTable.getDlaType());
- }
- }
-
- private void initIcebergScanProvider(IcebergExternalTable icebergTable)
throws UserException {
- Preconditions.checkNotNull(icebergTable);
- if (icebergTable.isView()) {
- throw new AnalysisException(
- String.format("Querying external view '%s.%s' is not
supported", icebergTable.getDbName(),
- icebergTable.getName()));
- }
-
- String catalogType = icebergTable.getIcebergCatalogType();
- switch (catalogType) {
- case IcebergExternalCatalog.ICEBERG_HMS:
- case IcebergExternalCatalog.ICEBERG_REST:
- case IcebergExternalCatalog.ICEBERG_DLF:
- case IcebergExternalCatalog.ICEBERG_GLUE:
- IcebergSource icebergSource = new IcebergApiSource(
- icebergTable, desc, columnNameToRange);
- scanProvider = new IcebergScanProvider(icebergSource,
analyzer);
- break;
- default:
- throw new UserException("Unknown iceberg catalog type: " +
catalogType);
- }
- }
-
- private void initTVFScanProvider(FunctionGenTable table,
ExternalFileTableValuedFunction tvf) {
- Preconditions.checkNotNull(table);
- scanProvider = new TVFScanProvider(table, desc, tvf);
- }
-
@Override
public void finalize(Analyzer analyzer) throws UserException {
doFinalize();
@@ -240,19 +177,13 @@ public class FileQueryScanNode extends FileScanNode {
}
// Create scan range locations and the statistics.
- private void doFinalize() throws UserException {
- createScanRangeLocations(conjuncts, params, scanProvider);
- this.inputSplitsNum += scanProvider.getInputSplitNum();
- this.totalFileSize += scanProvider.getInputFileSize();
- if (scanProvider instanceof HiveScanProvider) {
- this.totalPartitionNum = ((HiveScanProvider)
scanProvider).getTotalPartitionNum();
- this.readPartitionNum = ((HiveScanProvider)
scanProvider).getReadPartitionNum();
- }
+ protected void doFinalize() throws UserException {
+ createScanRangeLocations();
}
private void setColumnPositionMappingForTextFile()
throws UserException {
- TableIf tbl = scanProvider.getTargetTable();
+ TableIf tbl = getTargetTable();
List<Integer> columnIdxs = Lists.newArrayList();
for (TFileScanSlotInfo slot : params.getRequiredSlots()) {
@@ -270,20 +201,161 @@ public class FileQueryScanNode extends FileScanNode {
params.setColumnIdxs(columnIdxs);
}
- // To Support Hive 1.x orc internal column name like (_col0, _col1,
_col2...)
- private void genSlotToSchemaIdMap() {
- List<Column> baseSchema = desc.getTable().getBaseSchema();
- Map<String, Integer> columnNameToPosition = Maps.newHashMap();
- for (SlotDescriptor slot : desc.getSlots()) {
- int idx = 0;
- for (Column col : baseSchema) {
- if (col.getName().equals(slot.getColumn().getName())) {
- columnNameToPosition.put(col.getName(), idx);
- break;
+ public void createScanRangeLocations() throws UserException {
+ long start = System.currentTimeMillis();
+ List<Split> inputSplits = getSplits();
+ this.inputSplitNum = inputSplits.size();
+ if (inputSplits.isEmpty()) {
+ return;
+ }
+ FileSplit inputSplit = (FileSplit) inputSplits.get(0);
+ TFileType locationType = getLocationType();
+ params.setFileType(locationType);
+ TFileFormatType fileFormatType = getFileFormatType();
+ params.setFormatType(getFileFormatType());
+ if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN ||
fileFormatType == TFileFormatType.FORMAT_JSON) {
+ params.setFileAttributes(getFileAttributes());
+ }
+
+ // set hdfs params for hdfs file type.
+ Map<String, String> locationProperties = getLocationProperties();
+ if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
+ String fsName = getFsName(inputSplit);
+ THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
+ tHdfsParams.setFsName(fsName);
+ params.setHdfsParams(tHdfsParams);
+
+ if (locationType == TFileType.FILE_BROKER) {
+ FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+ if (broker == null) {
+ throw new UserException("No alive broker.");
}
- idx += 1;
+ params.addToBrokerAddresses(new TNetworkAddress(broker.ip,
broker.port));
+ }
+ } else if (locationType == TFileType.FILE_S3) {
+ params.setProperties(locationProperties);
+ }
+
+ List<String> pathPartitionKeys = getPathPartitionKeys();
+ for (Split split : inputSplits) {
+ TScanRangeLocations curLocations = newLocations(params,
backendPolicy);
+ FileSplit fileSplit = (FileSplit) split;
+
+ // If fileSplit has partition values, use the values collected
from hive partitions.
+ // Otherwise, use the values in file path.
+ List<String> partitionValuesFromPath =
fileSplit.getPartitionValues() == null
+ ?
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false)
+ : fileSplit.getPartitionValues();
+
+ TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys);
+ // external data lake table
+ if (fileSplit instanceof IcebergSplit) {
+ IcebergScanNode.setIcebergParams(rangeDesc, (IcebergSplit)
fileSplit);
+ }
+
+
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+ LOG.debug("assign to backend {} with table split: {} ({}, {}),
location: {}",
+ curLocations.getLocations().get(0).getBackendId(),
fileSplit.getPath(), fileSplit.getStart(),
+ fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
+ scanRangeLocations.add(curLocations);
+ this.inputFileSize += fileSplit.getLength();
+ }
+ LOG.debug("create #{} ScanRangeLocations cost: {} ms",
+ scanRangeLocations.size(), (System.currentTimeMillis() -
start));
+ }
+
+ private TScanRangeLocations newLocations(TFileScanRangeParams params,
FederationBackendPolicy backendPolicy) {
+ // Generate on file scan range
+ TFileScanRange fileScanRange = new TFileScanRange();
+ fileScanRange.setParams(params);
+
+ // Scan range
+ TExternalScanRange externalScanRange = new TExternalScanRange();
+ externalScanRange.setFileScanRange(fileScanRange);
+ TScanRange scanRange = new TScanRange();
+ scanRange.setExtScanRange(externalScanRange);
+
+ // Locations
+ TScanRangeLocations locations = new TScanRangeLocations();
+ locations.setScanRange(scanRange);
+
+ TScanRangeLocation location = new TScanRangeLocation();
+ Backend selectedBackend = backendPolicy.getNextBe();
+ location.setBackendId(selectedBackend.getId());
+ location.setServer(new TNetworkAddress(selectedBackend.getIp(),
selectedBackend.getBePort()));
+ locations.addToLocations(location);
+
+ return locations;
+ }
+
+ private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit,
List<String> columnsFromPath,
+ List<String>
columnsFromPathKeys)
+ throws UserException {
+ TFileRangeDesc rangeDesc = new TFileRangeDesc();
+ rangeDesc.setStartOffset(fileSplit.getStart());
+ rangeDesc.setSize(fileSplit.getLength());
+ // fileSize only be used when format is orc or parquet and TFileType
is broker
+ // When TFileType is other type, it is not necessary
+ rangeDesc.setFileSize(fileSplit.getFileLength());
+ rangeDesc.setColumnsFromPath(columnsFromPath);
+ rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
+
+ if (getLocationType() == TFileType.FILE_HDFS) {
+ rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
+ } else if (getLocationType() == TFileType.FILE_S3 || getLocationType()
== TFileType.FILE_BROKER) {
+ // need full path
+ rangeDesc.setPath(fileSplit.getPath().toString());
+ }
+ return rangeDesc;
+ }
+
+ protected TFileType getLocationType() throws UserException {
+ throw new NotImplementedException("");
+ }
+
+ protected TFileFormatType getFileFormatType() throws UserException {
+ throw new NotImplementedException("");
+ }
+
+ protected TFileAttributes getFileAttributes() throws UserException {
+ throw new NotImplementedException("");
+ }
+
+ protected List<String> getPathPartitionKeys() throws UserException {
+ throw new NotImplementedException("");
+ }
+
+ protected TableIf getTargetTable() throws UserException {
+ throw new NotImplementedException("");
+ }
+
+ protected Map<String, String> getLocationProperties() throws UserException
{
+ throw new NotImplementedException("");
+ }
+
+ // eg: hdfs://namenode s3://buckets
+ protected String getFsName(FileSplit split) {
+ String fullPath = split.getPath().toUri().toString();
+ String filePath = split.getPath().toUri().getPath();
+ return fullPath.replace(filePath, "");
+ }
+
+ protected static Optional<TFileType> getTFileType(String location) {
+ if (location != null && !location.isEmpty()) {
+ if (FeConstants.isObjStorage(location)) {
+ return Optional.of(TFileType.FILE_S3);
+ } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
+ return Optional.of(TFileType.FILE_HDFS);
+ } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
+ return Optional.of(TFileType.FILE_LOCAL);
+ } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
+ return Optional.of(TFileType.FILE_BROKER);
+ } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
+ return Optional.of(TFileType.FILE_BROKER);
+ } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
+ return Optional.of(TFileType.FILE_BROKER);
}
}
- params.setSlotNameToSchemaPos(columnNameToPosition);
+ return Optional.empty();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 364844ef36..3a24ba3319 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -27,6 +27,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExpr;
@@ -41,9 +43,12 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -55,6 +60,8 @@ import java.util.Map;
public class FileScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(FileScanNode.class);
+ public static final long DEFAULT_SPLIT_SIZE = 128 * 1024 * 1024; // 128MB
+
// For explain
protected long inputSplitsNum = 0;
protected long totalFileSize = 0;
@@ -152,24 +159,17 @@ public class FileScanNode extends ExternalScanNode {
return output.toString();
}
- // TODO: Keep 2 versions of createScanRangeLocations, will fix this while
refactor split and assignment code.
+ // TODO: This api is for load job only. Will remove it later.
protected void
createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
- FileScanProviderIf scanProvider)
+ LoadScanProvider scanProvider)
throws UserException {
scanProvider.createScanRangeLocations(context, backendPolicy,
scanRangeLocations);
}
- protected void createScanRangeLocations(List<Expr> conjuncts,
TFileScanRangeParams params,
- FileScanProviderIf scanProvider)
- throws UserException {
- scanProvider.createScanRangeLocations(conjuncts, params,
backendPolicy, scanRangeLocations);
- }
-
- protected void setDefaultValueExprs(FileScanProviderIf scanProvider,
+ protected void setDefaultValueExprs(TableIf tbl,
Map<String, SlotDescriptor>
slotDescByName,
TFileScanRangeParams params,
boolean useVarcharAsNull) throws
UserException {
- TableIf tbl = scanProvider.getTargetTable();
Preconditions.checkNotNull(tbl);
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());
@@ -210,4 +210,54 @@ public class FileScanNode extends ExternalScanNode {
}
}
}
+
+ protected List<Split> splitFile(Path path, long blockSize, BlockLocation[]
blockLocations, long length,
+ boolean splittable, List<String>
partitionValues) throws IOException {
+ if (blockLocations == null) {
+ blockLocations = new BlockLocation[0];
+ }
+ long splitSize =
ConnectContext.get().getSessionVariable().getFileSplitSize();
+ if (splitSize <= 0) {
+ splitSize = blockSize;
+ }
+ // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+ splitSize = Math.max(splitSize, DEFAULT_SPLIT_SIZE);
+ List<Split> result = Lists.newArrayList();
+ if (!splittable) {
+ LOG.debug("Path {} is not splittable.", path);
+ String[] hosts = blockLocations.length == 0 ? null :
blockLocations[0].getHosts();
+ result.add(new FileSplit(path, 0, length, length, hosts,
partitionValues));
+ return result;
+ }
+ long bytesRemaining;
+ for (bytesRemaining = length; (double) bytesRemaining / (double)
splitSize > 1.1D;
+ bytesRemaining -= splitSize) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
+ result.add(new FileSplit(path, length - bytesRemaining, splitSize,
length, hosts, partitionValues));
+ }
+ if (bytesRemaining != 0L) {
+ int location = getBlockIndex(blockLocations, length -
bytesRemaining);
+ String[] hosts = location == -1 ? null :
blockLocations[location].getHosts();
+ result.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining, length, hosts, partitionValues));
+ }
+
+ LOG.debug("Path {} includes {} splits.", path, result.size());
+ return result;
+ }
+
+ private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+ if (blkLocations == null || blkLocations.length == 0) {
+ return -1;
+ }
+ for (int i = 0; i < blkLocations.length; ++i) {
+ if (blkLocations[i].getOffset() <= offset
+ && offset < blkLocations[i].getOffset() +
blkLocations[i].getLength()) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1L;
+ throw new IllegalArgumentException(String.format("Offset %d is outside
of file (0..%d)", offset, fileLength));
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
deleted file mode 100644
index 367e5d4e72..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.FileLoadScanNode;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TScanRangeLocations;
-
-import java.util.List;
-import java.util.Map;
-
-public interface FileScanProviderIf {
- // Return parquet/orc/text, etc.
- TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException;
-
- // Return S3/HDSF, etc.
- TFileType getLocationType() throws DdlException, MetaNotFoundException;
-
- // return properties for S3/HDFS, etc.
- Map<String, String> getLocationProperties() throws MetaNotFoundException,
DdlException;
-
- List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException;
-
- FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer)
throws UserException;
-
- void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
- List<TScanRangeLocations>
scanRangeLocations) throws UserException;
-
- void createScanRangeLocations(List<Expr> conjuncts, TFileScanRangeParams
params,
- FederationBackendPolicy backendPolicy,
- List<TScanRangeLocations>
scanRangeLocations) throws UserException;
-
- int getInputSplitNum();
-
- long getInputFileSize();
-
- TableIf getTargetTable();
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index bd0fa97cc4..03f1ab3b60 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -17,7 +17,7 @@
package org.apache.doris.planner.external;
-import org.apache.doris.planner.Split;
+import org.apache.doris.spi.Split;
import lombok.Data;
import org.apache.hadoop.fs.Path;
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
-public class FileSplit extends Split {
+public class FileSplit implements Split {
protected Path path;
protected long start;
// length of this split, in bytes
@@ -34,6 +34,7 @@ public class FileSplit extends Split {
// -1 means unset.
// If the file length is not set, the file length will be fetched from the
file system.
protected long fileLength;
+ protected String[] hosts;
protected TableFormatType tableFormatType;
// The values of partitions.
// e.g for file : hdfs://path/to/table/part1=a/part2=b/datafile
@@ -46,15 +47,16 @@ public class FileSplit extends Split {
this.start = start;
this.length = length;
this.fileLength = fileLength;
- this.hosts = hosts;
+ this.hosts = hosts == null ? new String[0] : hosts;
this.partitionValues = partitionValues;
}
public String[] getHosts() {
- if (this.hosts == null) {
- return new String[]{};
- } else {
- return this.hosts;
- }
+ return hosts;
+ }
+
+ @Override
+ public Object getInfo() {
+ return null;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java
deleted file mode 100644
index 283fc90f30..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HMSTableScanProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.util.Map;
-
-public abstract class HMSTableScanProvider extends QueryScanProvider {
-
- public abstract String getMetaStoreUrl();
-
- public abstract Table getRemoteHiveTable() throws DdlException,
MetaNotFoundException;
-
- public abstract Map<String, String> getTableProperties() throws
MetaNotFoundException;
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
similarity index 51%
rename from
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
rename to
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 9d8093e982..96c1872010 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -17,32 +17,34 @@
package org.apache.doris.planner.external;
-import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
-import org.apache.doris.external.hive.util.HiveUtil;
-import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.RemoteFiles;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.planner.ColumnRange;
import org.apache.doris.planner.ListPartitionPrunerV2;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -50,23 +52,52 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HiveScanNode extends FileQueryScanNode {
+ private static final Logger LOG = LogManager.getLogger(HiveScanNode.class);
+
+ public static final String PROP_FIELD_DELIMITER = "field.delim";
+ public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
+ public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+ private final HMSExternalTable hmsTable;
+
+ /**
+ * * External file scan node for Query Hive table
+ * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
+ * eg: s3 tvf
+ * These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
+ */
+ public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ super(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE,
needCheckColumnPriv);
+ hmsTable = (HMSExternalTable) desc.getTable();
+ }
-public class HiveSplitter implements Splitter {
-
- private static final Logger LOG = LogManager.getLogger(HiveSplitter.class);
-
- private HMSExternalTable hmsTable;
- private Map<String, ColumnRange> columnNameToRange;
- private int totalPartitionNum = 0;
- private int readPartitionNum = 0;
+ public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName,
+ StatisticalType statisticalType, boolean
needCheckColumnPriv) {
+ super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+ hmsTable = (HMSExternalTable) desc.getTable();
+ }
- public HiveSplitter(HMSExternalTable hmsTable, Map<String, ColumnRange>
columnNameToRange) {
- this.hmsTable = hmsTable;
- this.columnNameToRange = columnNameToRange;
+ @Override
+ protected void doInitialize() throws UserException {
+ super.doInitialize();
+ genSlotToSchemaIdMap();
+ String inputFormat =
hmsTable.getRemoteTable().getSd().getInputFormat();
+ if (inputFormat.contains("TextInputFormat")) {
+ for (SlotDescriptor slot : desc.getSlots()) {
+ if (!slot.getType().isScalarType()) {
+ throw new UserException("For column `" +
slot.getColumn().getName()
+ + "`, The column types ARRAY/MAP/STRUCT are not
supported yet"
+ + " for text input format of Hive. ");
+ }
+ }
+ }
}
@Override
- public List<Split> getSplits(List<Expr> exprs) throws UserException {
+ protected List<Split> getSplits() throws UserException {
long start = System.currentTimeMillis();
try {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
@@ -78,14 +109,7 @@ public class HiveSplitter implements Splitter {
hivePartitionValues =
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
partitionColumnTypes);
}
- Map<String, String> properties =
hmsTable.getCatalog().getCatalogProperty().getProperties();
- boolean useSelfSplitter = true;
- if (properties.containsKey(HMSExternalCatalog.ENABLE_SELF_SPLITTER)
- &&
properties.get(HMSExternalCatalog.ENABLE_SELF_SPLITTER).equalsIgnoreCase("false"))
{
- LOG.debug("Using self splitter for hmsTable {}",
hmsTable.getName());
- useSelfSplitter = false;
- }
-
+ boolean useSelfSplitter = hmsTable.getCatalog().useSelfSplitter();
List<Split> allFiles = Lists.newArrayList();
if (hivePartitionValues != null) {
// 2. prune partitions by expr
@@ -135,85 +159,88 @@ public class HiveSplitter implements Splitter {
private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
List<Split> allFiles, boolean
useSelfSplitter) throws IOException {
+
for (HiveMetaStoreCache.FileCacheValue fileCacheValue :
cache.getFilesByPartitions(partitions, useSelfSplitter)) {
+ // This if branch is to support old splitter, will remove later.
if (fileCacheValue.getSplits() != null) {
allFiles.addAll(fileCacheValue.getSplits());
}
if (fileCacheValue.getFiles() != null) {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status :
fileCacheValue.getFiles()) {
- allFiles.addAll(splitFile(status, isSplittable,
fileCacheValue.getPartitionValues()));
+ allFiles.addAll(splitFile(status.getPath(),
status.getBlockSize(),
+ status.getBlockLocations(), status.getLength(),
+ isSplittable,
fileCacheValue.getPartitionValues()));
}
}
}
}
- private List<Split> splitFile(HiveMetaStoreCache.HiveFileStatus status,
- boolean splittable, List<String>
partitionValues) throws IOException {
- List<Split> result = Lists.newArrayList();
- if (!splittable) {
- LOG.debug("Path {} is not splittable.", status.getPath());
- BlockLocation block = status.getBlockLocations()[0];
- result.add(new FileSplit(status.getPath(), 0, status.getLength(),
- status.getLength(), block.getHosts(), partitionValues));
- return result;
- }
- long splitSize =
ConnectContext.get().getSessionVariable().getFileSplitSize();
- if (splitSize <= 0) {
- splitSize = status.getBlockSize();
- }
- // Min split size is DEFAULT_SPLIT_SIZE(128MB).
- splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize :
DEFAULT_SPLIT_SIZE;
- BlockLocation[] blockLocations = status.getBlockLocations();
- long length = status.getLength();
- long bytesRemaining;
- for (bytesRemaining = length; (double) bytesRemaining / (double)
splitSize > 1.1D;
- bytesRemaining -= splitSize) {
- int location = getBlockIndex(blockLocations, length -
bytesRemaining);
- result.add(new FileSplit(status.getPath(), length - bytesRemaining,
- splitSize, length, blockLocations[location].getHosts(),
partitionValues));
- }
- if (bytesRemaining != 0L) {
- int location = getBlockIndex(blockLocations, length -
bytesRemaining);
- result.add(new FileSplit(status.getPath(), length - bytesRemaining,
- bytesRemaining, length,
blockLocations[location].getHosts(), partitionValues));
- }
+ @Override
+ public List<String> getPathPartitionKeys() {
+ return hmsTable.getRemoteTable().getPartitionKeys()
+ .stream().map(FieldSchema::getName).collect(Collectors.toList());
+ }
- LOG.debug("Path {} includes {} splits.", status.getPath(),
result.size());
- return result;
+ @Override
+ public TableIf getTargetTable() {
+ return hmsTable;
}
- public int getTotalPartitionNum() {
- return totalPartitionNum;
+ @Override
+ protected TFileType getLocationType() throws UserException {
+ String location = hmsTable.getRemoteTable().getSd().getLocation();
+ return getTFileType(location).orElseThrow(() ->
+ new DdlException("Unknown file location " + location + " for hms
table " + hmsTable.getName()));
}
- public int getReadPartitionNum() {
- return readPartitionNum;
+ @Override
+ public TFileFormatType getFileFormatType() throws UserException {
+ TFileFormatType type = null;
+ String inputFormatName =
hmsTable.getRemoteTable().getSd().getInputFormat();
+ String hiveFormat =
HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
+ if
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc()))
{
+ type = TFileFormatType.FORMAT_PARQUET;
+ } else if
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
+ type = TFileFormatType.FORMAT_ORC;
+ } else if
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc()))
{
+ type = TFileFormatType.FORMAT_CSV_PLAIN;
+ }
+ return type;
+ }
+
+ @Override
+ protected Map<String, String> getLocationProperties() throws UserException
{
+ return hmsTable.getCatalogProperties();
}
- // Get File Status by using FileSystem API.
- public static HiveMetaStoreCache.FileCacheValue getFileCache(String
location, InputFormat<?, ?> inputFormat,
- JobConf
jobConf,
- List<String>
partitionValues) throws UserException {
- HiveMetaStoreCache.FileCacheValue result = new
HiveMetaStoreCache.FileCacheValue();
- result.setSplittable(HiveUtil.isSplittable(inputFormat, new
Path(location), jobConf));
- RemoteFileSystem fs = FileSystemFactory.getByLocation(location,
jobConf);
- RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
- locatedFiles.locations().forEach(result::addFile);
- result.setPartitionValues(partitionValues);
- return result;
+ @Override
+ protected TFileAttributes getFileAttributes() throws UserException {
+ TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
+ .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
+ textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
+ TFileAttributes fileAttributes = new TFileAttributes();
+ fileAttributes.setTextParams(textParams);
+ fileAttributes.setHeaderType("");
+ return fileAttributes;
}
- private static int getBlockIndex(BlockLocation[] blkLocations, long
offset) {
- for (int i = 0; i < blkLocations.length; ++i) {
- if (blkLocations[i].getOffset() <= offset
- && offset < blkLocations[i].getOffset() +
blkLocations[i].getLength()) {
- return i;
+ // To Support Hive 1.x orc internal column name like (_col0, _col1,
_col2...)
+ private void genSlotToSchemaIdMap() {
+ List<Column> baseSchema = desc.getTable().getBaseSchema();
+ Map<String, Integer> columnNameToPosition = Maps.newHashMap();
+ for (SlotDescriptor slot : desc.getSlots()) {
+ int idx = 0;
+ for (Column col : baseSchema) {
+ if (col.getName().equals(slot.getColumn().getName())) {
+ columnNameToPosition.put(col.getName(), idx);
+ break;
+ }
+ idx += 1;
}
}
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1L;
- throw new IllegalArgumentException(String.format("Offset %d is outside
of file (0..%d)", offset, fileLength));
+ params.setSlotNameToSchemaPos(columnNameToPosition);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
deleted file mode 100644
index 4037f97a13..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ /dev/null
@@ -1,139 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileTextScanRangeParams;
-import org.apache.doris.thrift.TFileType;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * A HiveScanProvider to get information for scan node.
- */
-public class HiveScanProvider extends HMSTableScanProvider {
- private static final Logger LOG =
LogManager.getLogger(HiveScanProvider.class);
-
- private static final String PROP_FIELD_DELIMITER = "field.delim";
- private static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
- private static final String DEFAULT_LINE_DELIMITER = "\n";
-
- protected HMSExternalTable hmsTable;
- protected final TupleDescriptor desc;
- protected Map<String, ColumnRange> columnNameToRange;
-
- public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
- Map<String, ColumnRange> columnNameToRange) {
- this.hmsTable = hmsTable;
- this.desc = desc;
- this.columnNameToRange = columnNameToRange;
- this.splitter = new HiveSplitter(hmsTable, columnNameToRange);
- }
-
- @Override
- public TableIf getTargetTable() {
- return hmsTable;
- }
-
- @Override
- public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
- TFileFormatType type = null;
- String inputFormatName = getRemoteHiveTable().getSd().getInputFormat();
- String hiveFormat =
HiveMetaStoreClientHelper.HiveFileFormat.getFormat(inputFormatName);
- if
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.PARQUET.getDesc()))
{
- type = TFileFormatType.FORMAT_PARQUET;
- } else if
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.ORC.getDesc())) {
- type = TFileFormatType.FORMAT_ORC;
- } else if
(hiveFormat.equals(HiveMetaStoreClientHelper.HiveFileFormat.TEXT_FILE.getDesc()))
{
- type = TFileFormatType.FORMAT_CSV_PLAIN;
- }
- return type;
- }
-
- @Override
- public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
- String location = hmsTable.getRemoteTable().getSd().getLocation();
- return getTFileType(location).orElseThrow(() ->
- new DdlException("Unknown file location " + location + "
for hms table " + hmsTable.getName()));
- }
-
- @Override
- public String getMetaStoreUrl() {
- return hmsTable.getMetastoreUri();
- }
-
- public int getTotalPartitionNum() {
- return ((HiveSplitter) splitter).getTotalPartitionNum();
- }
-
- public int getReadPartitionNum() {
- return ((HiveSplitter) splitter).getReadPartitionNum();
- }
-
- @Override
- public Table getRemoteHiveTable() throws DdlException,
MetaNotFoundException {
- return hmsTable.getRemoteTable();
- }
-
- @Override
- public Map<String, String> getTableProperties() throws
MetaNotFoundException {
- // TODO: implement it when we really properties from remote table.
- return Maps.newHashMap();
- }
-
- @Override
- public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
- return hmsTable.getCatalogProperties();
- }
-
- @Override
- public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
- return
getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList());
- }
-
- @Override
- public TFileAttributes getFileAttributes() throws UserException {
- TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
-
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
- .getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER));
- textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
- TFileAttributes fileAttributes = new TFileAttributes();
- fileAttributes.setTextParams(textParams);
- fileAttributes.setHeaderType("");
- return fileAttributes;
- }
-}
-
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java
similarity index 59%
rename from
fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
rename to
fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java
index 59274ec521..34ff3b7457 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HudiScanNode.java
@@ -18,34 +18,31 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.external.HMSExternalTable;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-/**
- * A file scan provider for hudi.
- * HudiProvier is extended with hive since they both use input format
interface to get the split.
- */
-public class HudiScanProvider extends HiveScanProvider {
-
- public HudiScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
- Map<String, ColumnRange> columnNameToRange) {
- super(hmsTable, desc, columnNameToRange);
+public class HudiScanNode extends HiveScanNode {
+ /**
+ * External file scan node for Query Hudi table
+ * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
+ * eg: s3 tvf
+ * These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
+ */
+ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
needCheckColumnPriv);
}
@Override
- public TFileFormatType getFileFormatType() throws DdlException {
+ public TFileFormatType getFileFormatType() {
return TFileFormatType.FORMAT_PARQUET;
}
@Override
- public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
+ public List<String> getPathPartitionKeys() {
return Collections.emptyList();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
deleted file mode 100644
index 15f19bf7a5..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.TableSnapshot;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.external.iceberg.util.IcebergUtils;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter;
-import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
-import org.apache.doris.planner.external.iceberg.IcebergSource;
-import org.apache.doris.planner.external.iceberg.IcebergSplit;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.HistoryEntry;
-import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.types.Conversions;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-public class IcebergSplitter implements Splitter {
- private static final Logger LOG =
LogManager.getLogger(IcebergSplitter.class);
-
- private final IcebergSource icebergSource;
- private final Analyzer analyzer;
-
- public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) {
- this.icebergSource = icebergSource;
- this.analyzer = analyzer;
- }
-
- @Override
- public List<Split> getSplits(List<Expr> exprs) throws UserException {
- List<Expression> expressions = new ArrayList<>();
- org.apache.iceberg.Table table = icebergSource.getIcebergTable();
- for (Expr conjunct : exprs) {
- Expression expression =
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
- if (expression != null) {
- expressions.add(expression);
- }
- }
- TableScan scan = table.newScan();
- TableSnapshot tableSnapshot =
icebergSource.getDesc().getRef().getTableSnapshot();
- if (tableSnapshot != null) {
- TableSnapshot.VersionType type = tableSnapshot.getType();
- try {
- if (type == TableSnapshot.VersionType.VERSION) {
- scan = scan.useSnapshot(tableSnapshot.getVersion());
- } else {
- long snapshotId =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
- scan =
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
- }
- } catch (IllegalArgumentException e) {
- throw new UserException(e);
- }
- }
- for (Expression predicate : expressions) {
- scan = scan.filter(predicate);
- }
- List<Split> splits = new ArrayList<>();
- int formatVersion = ((BaseTable)
table).operations().current().formatVersion();
- for (FileScanTask task : scan.planFiles()) {
- long fileSize = task.file().fileSizeInBytes();
- for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
- String dataFilePath = splitTask.file().path().toString();
- IcebergSplit split = new IcebergSplit(new Path(dataFilePath),
splitTask.start(),
- splitTask.length(), fileSize, new String[0]);
- split.setFormatVersion(formatVersion);
- if (formatVersion >=
IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) {
-
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
- }
- split.setTableFormatType(TableFormatType.ICEBERG);
- split.setAnalyzer(analyzer);
- splits.add(split);
- }
- }
- return splits;
- }
-
- public static long getSnapshotIdAsOfTime(List<HistoryEntry>
historyEntries, long asOfTimestamp) {
- // find history at or before asOfTimestamp
- HistoryEntry latestHistory = null;
- for (HistoryEntry entry : historyEntries) {
- if (entry.timestampMillis() <= asOfTimestamp) {
- if (latestHistory == null) {
- latestHistory = entry;
- continue;
- }
- if (entry.timestampMillis() > latestHistory.timestampMillis())
{
- latestHistory = entry;
- }
- }
- }
- if (latestHistory == null) {
- throw new NotFoundException("No version history at or before "
- + Instant.ofEpochMilli(asOfTimestamp));
- }
- return latestHistory.snapshotId();
- }
-
- private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask
spitTask) {
- List<IcebergDeleteFileFilter> filters = new ArrayList<>();
- for (DeleteFile delete : spitTask.deletes()) {
- if (delete.content() == FileContent.POSITION_DELETES) {
- ByteBuffer lowerBoundBytes =
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
- Optional<Long> positionLowerBound =
Optional.ofNullable(lowerBoundBytes)
- .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
- ByteBuffer upperBoundBytes =
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
- Optional<Long> positionUpperBound =
Optional.ofNullable(upperBoundBytes)
- .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
-
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
- positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L)));
- } else if (delete.content() == FileContent.EQUALITY_DELETES) {
- // todo:
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
- // delete.equalityFieldIds()));
- throw new IllegalStateException("Don't support equality delete
file");
- } else {
- throw new IllegalStateException("Unknown delete content: " +
delete.content());
- }
- }
- return filters;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index ea079b644e..89189ca970 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
@@ -55,7 +54,7 @@ import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
-public class LoadScanProvider implements FileScanProviderIf {
+public class LoadScanProvider {
private FileGroupInfo fileGroupInfo;
private TupleDescriptor destTupleDesc;
@@ -65,27 +64,22 @@ public class LoadScanProvider implements FileScanProviderIf
{
this.destTupleDesc = destTupleDesc;
}
- @Override
public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
return null;
}
- @Override
public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
return null;
}
- @Override
public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
return null;
}
- @Override
public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
return null;
}
- @Override
public FileLoadScanNode.ParamCreateContext createContext(Analyzer
analyzer) throws UserException {
FileLoadScanNode.ParamCreateContext ctx = new
FileLoadScanNode.ParamCreateContext();
ctx.destTupleDescriptor = destTupleDesc;
@@ -138,7 +132,6 @@ public class LoadScanProvider implements FileScanProviderIf
{
return "";
}
- @Override
public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext
context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations>
scanRangeLocations) throws UserException {
@@ -147,18 +140,10 @@ public class LoadScanProvider implements
FileScanProviderIf {
fileGroupInfo.createScanRangeLocations(context, backendPolicy,
scanRangeLocations);
}
- @Override
- public void createScanRangeLocations(List<Expr> conjuncts,
TFileScanRangeParams params,
- FederationBackendPolicy backendPolicy,
- List<TScanRangeLocations>
scanRangeLocations) throws UserException {
- }
-
- @Override
public int getInputSplitNum() {
return fileGroupInfo.getFileStatuses().size();
}
- @Override
public long getInputFileSize() {
long res = 0;
for (TBrokerFileStatus fileStatus : fileGroupInfo.getFileStatuses()) {
@@ -250,7 +235,6 @@ public class LoadScanProvider implements FileScanProviderIf
{
}
}
- @Override
public TableIf getTargetTable() {
return fileGroupInfo.getTargetTable();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
deleted file mode 100644
index ebd1831432..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ /dev/null
@@ -1,225 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.FsBroker;
-import org.apache.doris.catalog.HdfsResource;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.BrokerUtil;
-import org.apache.doris.planner.FileLoadScanNode;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
-import org.apache.doris.planner.external.iceberg.IcebergSplit;
-import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.TExternalScanRange;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileScanRange;
-import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.THdfsParams;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TScanRange;
-import org.apache.doris.thrift.TScanRangeLocation;
-import org.apache.doris.thrift.TScanRangeLocations;
-
-import com.google.common.base.Joiner;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-public abstract class QueryScanProvider implements FileScanProviderIf {
- public static final Logger LOG =
LogManager.getLogger(QueryScanProvider.class);
- private int inputSplitNum = 0;
- private long inputFileSize = 0;
- protected Splitter splitter;
-
- public abstract TFileAttributes getFileAttributes() throws UserException;
-
- @Override
- public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext
context,
- FederationBackendPolicy backendPolicy,
- List<TScanRangeLocations>
scanRangeLocations) throws UserException {
- }
-
- @Override
- public FileLoadScanNode.ParamCreateContext createContext(Analyzer
analyzer) throws UserException {
- return null;
- }
-
- @Override
- public void createScanRangeLocations(List<Expr> conjuncts,
TFileScanRangeParams params,
- FederationBackendPolicy backendPolicy,
- List<TScanRangeLocations>
scanRangeLocations) throws UserException {
- long start = System.currentTimeMillis();
- List<Split> inputSplits = splitter.getSplits(conjuncts);
- this.inputSplitNum = inputSplits.size();
- if (inputSplits.isEmpty()) {
- return;
- }
- FileSplit inputSplit = (FileSplit) inputSplits.get(0);
- TFileType locationType = getLocationType();
- params.setFileType(locationType);
- TFileFormatType fileFormatType = getFileFormatType();
- params.setFormatType(getFileFormatType());
- if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN ||
fileFormatType == TFileFormatType.FORMAT_JSON) {
- params.setFileAttributes(getFileAttributes());
- }
-
- // set hdfs params for hdfs file type.
- Map<String, String> locationProperties = getLocationProperties();
- if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
- String fsName = "";
- if (this instanceof TVFScanProvider) {
- fsName = ((TVFScanProvider) this).getFsName();
- } else {
- String fullPath = inputSplit.getPath().toUri().toString();
- String filePath = inputSplit.getPath().toUri().getPath();
- // eg:
- // hdfs://namenode
- // s3://buckets
- fsName = fullPath.replace(filePath, "");
- }
- THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
- tHdfsParams.setFsName(fsName);
- params.setHdfsParams(tHdfsParams);
-
- if (locationType == TFileType.FILE_BROKER) {
- FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
- if (broker == null) {
- throw new UserException("No alive broker.");
- }
- params.addToBrokerAddresses(new TNetworkAddress(broker.ip,
broker.port));
- }
- } else if (locationType == TFileType.FILE_S3) {
- params.setProperties(locationProperties);
- }
-
- List<String> pathPartitionKeys = getPathPartitionKeys();
- for (Split split : inputSplits) {
- TScanRangeLocations curLocations = newLocations(params,
backendPolicy);
- FileSplit fileSplit = (FileSplit) split;
-
- // If fileSplit has partition values, use the values collected
from hive partitions.
- // Otherwise, use the values in file path.
- List<String> partitionValuesFromPath =
fileSplit.getPartitionValues() == null
- ?
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
pathPartitionKeys, false)
- : fileSplit.getPartitionValues();
-
- TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys);
- // external data lake table
- if (fileSplit instanceof IcebergSplit) {
- IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit)
fileSplit);
- }
-
-
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
- LOG.debug("assign to backend {} with table split: {} ({}, {}),
location: {}",
- curLocations.getLocations().get(0).getBackendId(),
fileSplit.getPath(), fileSplit.getStart(),
- fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
- scanRangeLocations.add(curLocations);
- this.inputFileSize += fileSplit.getLength();
- }
- LOG.debug("create #{} ScanRangeLocations cost: {} ms",
- scanRangeLocations.size(), (System.currentTimeMillis() -
start));
- }
-
- @Override
- public int getInputSplitNum() {
- return this.inputSplitNum;
- }
-
- @Override
- public long getInputFileSize() {
- return this.inputFileSize;
- }
-
- private TScanRangeLocations newLocations(TFileScanRangeParams params,
FederationBackendPolicy backendPolicy) {
- // Generate on file scan range
- TFileScanRange fileScanRange = new TFileScanRange();
- fileScanRange.setParams(params);
-
- // Scan range
- TExternalScanRange externalScanRange = new TExternalScanRange();
- externalScanRange.setFileScanRange(fileScanRange);
- TScanRange scanRange = new TScanRange();
- scanRange.setExtScanRange(externalScanRange);
-
- // Locations
- TScanRangeLocations locations = new TScanRangeLocations();
- locations.setScanRange(scanRange);
-
- TScanRangeLocation location = new TScanRangeLocation();
- Backend selectedBackend = backendPolicy.getNextBe();
- location.setBackendId(selectedBackend.getId());
- location.setServer(new TNetworkAddress(selectedBackend.getIp(),
selectedBackend.getBePort()));
- locations.addToLocations(location);
-
- return locations;
- }
-
- private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit,
List<String> columnsFromPath,
- List<String> columnsFromPathKeys)
- throws DdlException, MetaNotFoundException {
- TFileRangeDesc rangeDesc = new TFileRangeDesc();
- rangeDesc.setStartOffset(fileSplit.getStart());
- rangeDesc.setSize(fileSplit.getLength());
- // fileSize only be used when format is orc or parquet and TFileType
is broker
- // When TFileType is other type, it is not necessary
- rangeDesc.setFileSize(fileSplit.getFileLength());
- rangeDesc.setColumnsFromPath(columnsFromPath);
- rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
-
- if (getLocationType() == TFileType.FILE_HDFS) {
- rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
- } else if (getLocationType() == TFileType.FILE_S3 || getLocationType()
== TFileType.FILE_BROKER) {
- // need full path
- rangeDesc.setPath(fileSplit.getPath().toString());
- }
- return rangeDesc;
- }
-
- protected static Optional<TFileType> getTFileType(String location) {
- if (location != null && !location.isEmpty()) {
- if (FeConstants.isObjStorage(location)) {
- return Optional.of(TFileType.FILE_S3);
- } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
- return Optional.of(TFileType.FILE_HDFS);
- } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
- return Optional.of(TFileType.FILE_LOCAL);
- } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- } else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
- return Optional.of(TFileType.FILE_BROKER);
- }
- }
- return Optional.empty();
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
new file mode 100644
index 0000000000..a8ede0d843
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanNode.java
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.FunctionGenTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class TVFScanNode extends FileQueryScanNode {
+ private static final Logger LOG = LogManager.getLogger(TVFScanNode.class);
+
+ private final ExternalFileTableValuedFunction tableValuedFunction;
+ private final FunctionGenTable table;
+
+ /**
+ * External file scan node for table value function
+ * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
+ * eg: s3 tvf
+ * These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
+ */
+ public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE,
needCheckColumnPriv);
+ table = (FunctionGenTable) this.desc.getTable();
+ tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
+ }
+
+ @Override
+ protected void doInitialize() throws UserException {
+ Preconditions.checkNotNull(desc);
+ computeColumnFilter();
+ initBackendPolicy();
+ initSchemaParams();
+ }
+
+ @Override
+ protected String getFsName(FileSplit split) {
+ return tableValuedFunction.getFsName();
+ }
+
+ @Override
+ public TFileAttributes getFileAttributes() throws UserException {
+ return tableValuedFunction.getFileAttributes();
+ }
+
+ @Override
+ public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
+ return tableValuedFunction.getTFileFormatType();
+ }
+
+ @Override
+ public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
+ return tableValuedFunction.getTFileType();
+ }
+
+ @Override
+ public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
+ return tableValuedFunction.getLocationProperties();
+ }
+
+ @Override
+ public List<String> getPathPartitionKeys() {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public TableIf getTargetTable() {
+ return table;
+ }
+
+ @Override
+ public List<Split> getSplits() throws UserException {
+ List<Split> splits = Lists.newArrayList();
+ List<TBrokerFileStatus> fileStatuses =
tableValuedFunction.getFileStatuses();
+ for (TBrokerFileStatus fileStatus : fileStatuses) {
+ Path path = new Path(fileStatus.getPath());
+ try {
+ splits.addAll(splitFile(path, fileStatus.getBlockSize(), null,
fileStatus.getSize(),
+ fileStatus.isSplitable, null));
+ } catch (IOException e) {
+ LOG.warn("get file split failed for TVF: {}", path, e);
+ throw new UserException(e);
+ }
+ }
+ return splits;
+ }
+
+ private void addFileSplits(Path path, long fileSize, long splitSize,
List<Split> splits) {
+ long bytesRemaining;
+ for (bytesRemaining = fileSize; (double) bytesRemaining / (double)
splitSize > 1.1D;
+ bytesRemaining -= splitSize) {
+ splits.add(new FileSplit(path, fileSize - bytesRemaining,
splitSize, fileSize, new String[0], null));
+ }
+ if (bytesRemaining != 0L) {
+ splits.add(new FileSplit(path, fileSize - bytesRemaining,
bytesRemaining, fileSize, new String[0], null));
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
deleted file mode 100644
index e4941ffa5a..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.FunctionGenTable;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileType;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-import java.util.Map;
-
-public class TVFScanProvider extends QueryScanProvider {
- private FunctionGenTable tvfTable;
- private final TupleDescriptor desc;
- private ExternalFileTableValuedFunction tableValuedFunction;
-
- public TVFScanProvider(FunctionGenTable tvfTable, TupleDescriptor desc,
- ExternalFileTableValuedFunction
tableValuedFunction) {
- this.tvfTable = tvfTable;
- this.desc = desc;
- this.tableValuedFunction = tableValuedFunction;
- this.splitter = new TVFSplitter(tableValuedFunction);
- }
-
- public String getFsName() {
- return tableValuedFunction.getFsName();
- }
-
- // =========== implement abstract methods of QueryScanProvider
=================
- @Override
- public TFileAttributes getFileAttributes() throws UserException {
- return tableValuedFunction.getFileAttributes();
- }
-
-
- // =========== implement interface methods of FileScanProviderIf
================
- @Override
- public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
- return tableValuedFunction.getTFileFormatType();
- }
-
- @Override
- public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
- return tableValuedFunction.getTFileType();
- }
-
- @Override
- public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
- return tableValuedFunction.getLocationProperties();
- }
-
- @Override
- public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
- return Lists.newArrayList();
- }
-
- @Override
- public TableIf getTargetTable() {
- return tvfTable;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
deleted file mode 100644
index e2f5e556aa..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external;
-
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.Split;
-import org.apache.doris.planner.Splitter;
-import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
-import org.apache.doris.thrift.TBrokerFileStatus;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-
-public class TVFSplitter implements Splitter {
-
- private static final Logger LOG = LogManager.getLogger(TVFSplitter.class);
-
- private ExternalFileTableValuedFunction tableValuedFunction;
-
- public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) {
- this.tableValuedFunction = tableValuedFunction;
- }
-
- @Override
- public List<Split> getSplits(List<Expr> exprs) throws UserException {
- List<Split> splits = Lists.newArrayList();
- List<TBrokerFileStatus> fileStatuses =
tableValuedFunction.getFileStatuses();
- for (TBrokerFileStatus fileStatus : fileStatuses) {
- long fileLength = fileStatus.getSize();
- Path path = new Path(fileStatus.getPath());
- if (fileStatus.isSplitable) {
- long splitSize =
ConnectContext.get().getSessionVariable().getFileSplitSize();
- if (splitSize <= 0) {
- splitSize = fileStatus.getBlockSize();
- }
- // Min split size is DEFAULT_SPLIT_SIZE(128MB).
- splitSize = splitSize > DEFAULT_SPLIT_SIZE ? splitSize :
DEFAULT_SPLIT_SIZE;
- addFileSplits(path, fileLength, splitSize, splits);
- } else {
- Split split = new FileSplit(path, 0, fileLength, fileLength,
new String[0], null);
- splits.add(split);
- }
- }
- return splits;
- }
-
- private void addFileSplits(Path path, long fileSize, long splitSize,
List<Split> splits) {
- long bytesRemaining;
- for (bytesRemaining = fileSize; (double) bytesRemaining / (double)
splitSize > 1.1D;
- bytesRemaining -= splitSize) {
- splits.add(new FileSplit(path, fileSize - bytesRemaining,
splitSize, fileSize, new String[0], null));
- }
- if (bytesRemaining != 0L) {
- splits.add(new FileSplit(path, fileSize - bytesRemaining,
bytesRemaining, fileSize, new String[0], null));
- }
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
index 87760f77e1..0fc0f39dff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
@@ -26,8 +26,9 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.external.HiveScanProvider;
+import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.iceberg.TableProperties;
@@ -36,15 +37,14 @@ import java.util.Map;
public class IcebergHMSSource implements IcebergSource {
private final HMSExternalTable hmsTable;
- private final HiveScanProvider hiveScanProvider;
-
private final TupleDescriptor desc;
+ private final Map<String, ColumnRange> columnNameToRange;
public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
- this.hiveScanProvider = new HiveScanProvider(hmsTable, desc,
columnNameToRange);
this.hmsTable = hmsTable;
this.desc = desc;
+ this.columnNameToRange = columnNameToRange;
}
@Override
@@ -54,8 +54,8 @@ public class IcebergHMSSource implements IcebergSource {
@Override
public String getFileFormat() throws DdlException, MetaNotFoundException {
- return hiveScanProvider.getRemoteHiveTable().getParameters()
- .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ return hmsTable.getRemoteTable().getParameters()
+ .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
}
public org.apache.iceberg.Table getIcebergTable() throws
MetaNotFoundException {
@@ -64,12 +64,19 @@ public class IcebergHMSSource implements IcebergSource {
@Override
public TableIf getTargetTable() {
- return hiveScanProvider.getTargetTable();
+ return hmsTable;
}
@Override
public TFileAttributes getFileAttributes() throws UserException {
- return hiveScanProvider.getFileAttributes();
+ TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
+
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
+ .getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER,
HiveScanNode.DEFAULT_FIELD_DELIMITER));
+ textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER);
+ TFileAttributes fileAttributes = new TFileAttributes();
+ fileAttributes.setTextParams(textParams);
+ fileAttributes.setHeaderType("");
+ return fileAttributes;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
new file mode 100644
index 0000000000..f19a7a43de
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external.iceberg;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TableSnapshot;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.external.IcebergExternalTable;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TableFormatType;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TIcebergDeleteFileDesc;
+import org.apache.doris.thrift.TIcebergFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import avro.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Conversions;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+public class IcebergScanNode extends FileQueryScanNode {
+
+ public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+
+ private IcebergSource source;
+
+ /**
+ * External file scan node for Query iceberg table
+ * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
+ * eg: s3 tvf
+ * These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
+ */
+ public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ super(id, desc, "ICEBERG_SCAN_NODE",
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv);
+ }
+
+ @Override
+ protected void doInitialize() throws UserException {
+ ExternalTable table = (ExternalTable) desc.getTable();
+ if (table.isView()) {
+ throw new AnalysisException(
+ String.format("Querying external view '%s.%s' is not
supported", table.getDbName(), table.getName()));
+ }
+ computeColumnFilter();
+ initBackendPolicy();
+ if (table instanceof HMSExternalTable) {
+ source = new IcebergHMSSource((HMSExternalTable) table, desc,
columnNameToRange);
+ } else if (table instanceof IcebergExternalTable) {
+ String catalogType = ((IcebergExternalTable)
table).getIcebergCatalogType();
+ switch (catalogType) {
+ case IcebergExternalCatalog.ICEBERG_HMS:
+ case IcebergExternalCatalog.ICEBERG_REST:
+ case IcebergExternalCatalog.ICEBERG_DLF:
+ case IcebergExternalCatalog.ICEBERG_GLUE:
+ source = new IcebergApiSource((IcebergExternalTable)
table, desc, columnNameToRange);
+ break;
+ default:
+ throw new UserException("Unknown iceberg catalog type: " +
catalogType);
+ }
+ }
+ Preconditions.checkNotNull(source);
+ initSchemaParams();
+ }
+
+ public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit) {
+ TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+ TIcebergFileDesc fileDesc = new TIcebergFileDesc();
+ int formatVersion = icebergSplit.getFormatVersion();
+ fileDesc.setFormatVersion(formatVersion);
+ if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
+ fileDesc.setContent(FileContent.DATA.id());
+ } else {
+ for (IcebergDeleteFileFilter filter :
icebergSplit.getDeleteFileFilters()) {
+ TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
+ deleteFileDesc.setPath(filter.getDeleteFilePath());
+ if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
+ fileDesc.setContent(FileContent.POSITION_DELETES.id());
+ IcebergDeleteFileFilter.PositionDelete positionDelete =
+ (IcebergDeleteFileFilter.PositionDelete) filter;
+ OptionalLong lowerBound =
positionDelete.getPositionLowerBound();
+ OptionalLong upperBound =
positionDelete.getPositionUpperBound();
+ if (lowerBound.isPresent()) {
+
deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong());
+ }
+ if (upperBound.isPresent()) {
+
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
+ }
+ } else {
+ fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
+ IcebergDeleteFileFilter.EqualityDelete equalityDelete =
+ (IcebergDeleteFileFilter.EqualityDelete) filter;
+ deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
+ }
+ fileDesc.addToDeleteFiles(deleteFileDesc);
+ }
+ }
+ tableFormatFileDesc.setIcebergParams(fileDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ }
+
+ @Override
+ public List<Split> getSplits() throws UserException {
+ List<Expression> expressions = new ArrayList<>();
+ org.apache.iceberg.Table table = source.getIcebergTable();
+ for (Expr conjunct : conjuncts) {
+ Expression expression =
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
+ if (expression != null) {
+ expressions.add(expression);
+ }
+ }
+ TableScan scan = table.newScan();
+ TableSnapshot tableSnapshot =
source.getDesc().getRef().getTableSnapshot();
+ if (tableSnapshot != null) {
+ TableSnapshot.VersionType type = tableSnapshot.getType();
+ try {
+ if (type == TableSnapshot.VersionType.VERSION) {
+ scan = scan.useSnapshot(tableSnapshot.getVersion());
+ } else {
+ long snapshotId =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+ scan =
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
+ }
+ } catch (IllegalArgumentException e) {
+ throw new UserException(e);
+ }
+ }
+ for (Expression predicate : expressions) {
+ scan = scan.filter(predicate);
+ }
+ List<Split> splits = new ArrayList<>();
+ int formatVersion = ((BaseTable)
table).operations().current().formatVersion();
+ // Min split size is DEFAULT_SPLIT_SIZE(128MB).
+ long splitSize =
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(),
DEFAULT_SPLIT_SIZE);
+ for (FileScanTask task : scan.planFiles()) {
+ long fileSize = task.file().fileSizeInBytes();
+ for (FileScanTask splitTask : task.split(splitSize)) {
+ String dataFilePath = splitTask.file().path().toString();
+ IcebergSplit split = new IcebergSplit(new Path(dataFilePath),
splitTask.start(),
+ splitTask.length(), fileSize, new String[0]);
+ split.setFormatVersion(formatVersion);
+ if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
+
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
+ }
+ split.setTableFormatType(TableFormatType.ICEBERG);
+ splits.add(split);
+ }
+ }
+ return splits;
+ }
+
+ private long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long
asOfTimestamp) {
+ // find history at or before asOfTimestamp
+ HistoryEntry latestHistory = null;
+ for (HistoryEntry entry : historyEntries) {
+ if (entry.timestampMillis() <= asOfTimestamp) {
+ if (latestHistory == null) {
+ latestHistory = entry;
+ continue;
+ }
+ if (entry.timestampMillis() > latestHistory.timestampMillis())
{
+ latestHistory = entry;
+ }
+ }
+ }
+ if (latestHistory == null) {
+ throw new NotFoundException("No version history at or before "
+ + Instant.ofEpochMilli(asOfTimestamp));
+ }
+ return latestHistory.snapshotId();
+ }
+
+ private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask
spitTask) {
+ List<IcebergDeleteFileFilter> filters = new ArrayList<>();
+ for (DeleteFile delete : spitTask.deletes()) {
+ if (delete.content() == FileContent.POSITION_DELETES) {
+ ByteBuffer lowerBoundBytes =
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+ Optional<Long> positionLowerBound =
Optional.ofNullable(lowerBoundBytes)
+ .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+ ByteBuffer upperBoundBytes =
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+ Optional<Long> positionUpperBound =
Optional.ofNullable(upperBoundBytes)
+ .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
+ positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L)));
+ } else if (delete.content() == FileContent.EQUALITY_DELETES) {
+ // todo:
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
+ throw new IllegalStateException("Don't support equality delete
file");
+ } else {
+ throw new IllegalStateException("Unknown delete content: " +
delete.content());
+ }
+ }
+ return filters;
+ }
+
+ @Override
+ public TFileType getLocationType() throws UserException {
+ Table icebergTable = source.getIcebergTable();
+ String location = icebergTable.location();
+ return getTFileType(location).orElseThrow(() ->
+ new DdlException("Unknown file location " + location + " for
iceberg table " + icebergTable.name()));
+ }
+
+ @Override
+ public TFileFormatType getFileFormatType() throws UserException {
+ TFileFormatType type;
+ String icebergFormat = source.getFileFormat();
+ if (icebergFormat.equalsIgnoreCase("parquet")) {
+ type = TFileFormatType.FORMAT_PARQUET;
+ } else if (icebergFormat.equalsIgnoreCase("orc")) {
+ type = TFileFormatType.FORMAT_ORC;
+ } else {
+ throw new DdlException(String.format("Unsupported format name: %s
for iceberg table.", icebergFormat));
+ }
+ return type;
+ }
+
+ @Override
+ public TFileAttributes getFileAttributes() throws UserException {
+ return source.getFileAttributes();
+ }
+
+ @Override
+ public List<String> getPathPartitionKeys() throws UserException {
+ return
source.getIcebergTable().spec().fields().stream().map(PartitionField::name)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public TableIf getTargetTable() {
+ return source.getTargetTable();
+ }
+
+ @Override
+ public Map<String, String> getLocationProperties() throws UserException {
+ return source.getCatalog().getProperties();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
deleted file mode 100644
index d0bd72f5c7..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner.external.iceberg;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.external.IcebergSplitter;
-import org.apache.doris.planner.external.QueryScanProvider;
-import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileRangeDesc;
-import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TIcebergDeleteFileDesc;
-import org.apache.doris.thrift.TIcebergFileDesc;
-import org.apache.doris.thrift.TTableFormatFileDesc;
-
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.PartitionField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.OptionalLong;
-import java.util.stream.Collectors;
-
-/**
- * A file scan provider for iceberg.
- */
-public class IcebergScanProvider extends QueryScanProvider {
-
- public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
- private final Analyzer analyzer;
- private final IcebergSource icebergSource;
-
- public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer)
{
- this.icebergSource = icebergSource;
- this.analyzer = analyzer;
- this.splitter = new IcebergSplitter(icebergSource, analyzer);
- }
-
- public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit) {
- TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
-
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
- TIcebergFileDesc fileDesc = new TIcebergFileDesc();
- int formatVersion = icebergSplit.getFormatVersion();
- fileDesc.setFormatVersion(formatVersion);
- if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
- fileDesc.setContent(FileContent.DATA.id());
- } else {
- for (IcebergDeleteFileFilter filter :
icebergSplit.getDeleteFileFilters()) {
- TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
- deleteFileDesc.setPath(filter.getDeleteFilePath());
- if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
- fileDesc.setContent(FileContent.POSITION_DELETES.id());
- IcebergDeleteFileFilter.PositionDelete positionDelete =
- (IcebergDeleteFileFilter.PositionDelete) filter;
- OptionalLong lowerBound =
positionDelete.getPositionLowerBound();
- OptionalLong upperBound =
positionDelete.getPositionUpperBound();
- if (lowerBound.isPresent()) {
-
deleteFileDesc.setPositionLowerBound(lowerBound.getAsLong());
- }
- if (upperBound.isPresent()) {
-
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
- }
- } else {
- fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
- IcebergDeleteFileFilter.EqualityDelete equalityDelete =
- (IcebergDeleteFileFilter.EqualityDelete) filter;
- deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
- }
- fileDesc.addToDeleteFiles(deleteFileDesc);
- }
- }
- tableFormatFileDesc.setIcebergParams(fileDesc);
- rangeDesc.setTableFormatParams(tableFormatFileDesc);
- }
-
- @Override
- public TFileType getLocationType() throws DdlException,
MetaNotFoundException {
- org.apache.iceberg.Table table = icebergSource.getIcebergTable();
- String location = table.location();
- return getTFileType(location).orElseThrow(() ->
- new DdlException("Unknown file location " + location + "
for iceberg table " + table.name()));
- }
-
- @Override
- public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
- return
icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)
- .collect(Collectors.toList());
- }
-
- @Override
- public TFileFormatType getFileFormatType() throws DdlException,
MetaNotFoundException {
- TFileFormatType type;
- String icebergFormat = icebergSource.getFileFormat();
- if (icebergFormat.equalsIgnoreCase("parquet")) {
- type = TFileFormatType.FORMAT_PARQUET;
- } else if (icebergFormat.equalsIgnoreCase("orc")) {
- type = TFileFormatType.FORMAT_ORC;
- } else {
- throw new DdlException(String.format("Unsupported format name: %s
for iceberg table.", icebergFormat));
- }
- return type;
- }
-
- @Override
- public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
- return icebergSource.getCatalog().getProperties();
- }
-
- @Override
- public TableIf getTargetTable() {
- return icebergSource.getTargetTable();
- }
-
- @Override
- public TFileAttributes getFileAttributes() throws UserException {
- return icebergSource.getFileAttributes();
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index e840c9a876..896b4968b4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -17,7 +17,6 @@
package org.apache.doris.planner.external.iceberg;
-import org.apache.doris.analysis.Analyzer;
import org.apache.doris.planner.external.FileSplit;
import lombok.Data;
@@ -31,8 +30,6 @@ public class IcebergSplit extends FileSplit {
super(file, start, length, fileLength, hosts, null);
}
- private Analyzer analyzer;
- private String dataFilePath;
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
similarity index 79%
rename from fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
rename to fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
index 63b837aacc..31b1e1515a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/spi/Split.java
@@ -15,17 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner;
+package org.apache.doris.spi;
-import lombok.Data;
+/**
+ * Split interface. e.g. Tablet for Olap Table.
+ */
+public interface Split {
-@Data
-public abstract class Split {
- protected String[] hosts;
+ String[] getHosts();
- public Split() {}
+ Object getInfo();
- public Split(String[] hosts) {
- this.hosts = hosts;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 586c9139cc..c39be3cf2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -31,6 +31,8 @@ public enum StatisticalType {
HASH_JOIN_NODE,
HIVE_SCAN_NODE,
ICEBERG_SCAN_NODE,
+ HUDI_SCAN_NODE,
+ TVF_SCAN_NODE,
INTERSECT_NODE,
LOAD_SCAN_NODE,
MYSQL_SCAN_NODE,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index bde54ab4b0..48e8307799 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -37,7 +37,7 @@ import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
-import org.apache.doris.planner.external.FileQueryScanNode;
+import org.apache.doris.planner.external.TVFScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
import org.apache.doris.proto.Types.PScalarType;
@@ -317,7 +317,7 @@ public abstract class ExternalFileTableValuedFunction
extends TableValuedFunctio
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
- return new FileQueryScanNode(id, desc, false);
+ return new TVFScanNode(id, desc, false);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]