This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0d05e4cce0 [Improvement](multi-catalog) The interface of external
Splitter. WIP (#17390)
0d05e4cce0 is described below
commit 0d05e4cce019e88d5c5cdd99d5435643f9f8f71d
Author: Jibing-Li <[email protected]>
AuthorDate: Sun Mar 12 20:11:08 2023 +0800
[Improvement](multi-catalog) The interface of external Splitter. WIP
(#17390)
This is PR introduce splitter interface external table.
The splitter interface contain one method getSplits, which is used by
QueryScanProvider to get the external file split.
For Hive/Iceberg/TVF, a split is a file block. For ES, it is a shard.
This PR also move the getSplits logic in FileScanProviderIf to the new
Splitter interface.
In the future, we may unify internal table as well.
---
.../doris/datasource/hive/HiveMetaStoreCache.java | 4 +-
.../TableFormatType.java => OlapSplitter.java} | 18 ++-
.../{external/HiveSplit.java => Split.java} | 16 +--
.../TableFormatType.java => Splitter.java} | 18 +--
.../doris/planner/external/FileScanProviderIf.java | 7 -
.../external/{HiveSplit.java => FileSplit.java} | 27 +++-
.../doris/planner/external/FileSplitStrategy.java | 2 -
.../doris/planner/external/HiveScanProvider.java | 93 +------------
.../doris/planner/external/HiveSplitter.java | 155 +++++++++++++++++++++
.../doris/planner/external/IcebergSplitter.java | 154 ++++++++++++++++++++
.../doris/planner/external/LoadScanProvider.java | 8 --
.../doris/planner/external/QueryScanProvider.java | 149 +++++++++-----------
.../doris/planner/external/TVFScanProvider.java | 19 +--
.../apache/doris/planner/external/TVFSplitter.java | 56 ++++++++
.../doris/planner/external/TableFormatType.java | 1 +
.../external/iceberg/IcebergScanProvider.java | 118 +---------------
.../planner/external/iceberg/IcebergSplit.java | 4 +-
17 files changed, 490 insertions(+), 359 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 31b8d2f3b4..c2313eb885 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -38,6 +38,7 @@ import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -59,7 +60,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.parquet.Strings;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@@ -250,6 +250,8 @@ public class HiveMetaStoreCache {
InputFormat<?, ?> inputFormat =
HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
InputSplit[] splits;
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
+
+ // TODO: Implement getSplits logic by ourselves, don't call
inputFormat.getSplits anymore.
if (!Strings.isNullOrEmpty(remoteUser)) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(remoteUser);
splits = ugi.doAs(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
similarity index 71%
copy from
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
index 794283de80..f2d06a3aef 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapSplitter.java
@@ -15,19 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner;
-public enum TableFormatType {
- ICEBERG("iceberg"),
- HUDI("hudi");
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.UserException;
- private final String tableFormatType;
+import java.util.List;
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
- }
+public class OlapSplitter implements Splitter {
- public String value() {
- return tableFormatType;
+ @Override
+ public List<Split> getSplits(List<Expr> exprs) throws UserException {
+ return null;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
similarity index 70%
copy from
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
index 6c8f916a5e..63b837aacc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Split.java
@@ -15,19 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner;
import lombok.Data;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
@Data
-public class HiveSplit extends FileSplit {
- public HiveSplit() {}
+public abstract class Split {
+ protected String[] hosts;
- public HiveSplit(Path file, long start, long length, String[] hosts) {
- super(file, start, length, hosts);
- }
+ public Split() {}
- protected TableFormatType tableFormatType;
+ public Split(String[] hosts) {
+ this.hosts = hosts;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
similarity index 71%
copy from
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
index 794283de80..07952bff87 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Splitter.java
@@ -15,19 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.planner.external;
+package org.apache.doris.planner;
-public enum TableFormatType {
- ICEBERG("iceberg"),
- HUDI("hudi");
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.UserException;
- private final String tableFormatType;
+import java.util.List;
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
- }
-
- public String value() {
- return tableFormatType;
- }
+public interface Splitter {
+ List<Split> getSplits(List<Expr> exprs) throws UserException;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
index 8ae7952169..f962f4d827 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
@@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@@ -28,9 +27,6 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TScanRangeLocations;
-import org.apache.hadoop.mapred.InputSplit;
-
-import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -41,9 +37,6 @@ public interface FileScanProviderIf {
// Return S3/HDSF, etc.
TFileType getLocationType() throws DdlException, MetaNotFoundException;
- // Return file list
- List<InputSplit> getSplits(List<Expr> exprs) throws IOException,
UserException;
-
// return properties for S3/HDFS, etc.
Map<String, String> getLocationProperties() throws MetaNotFoundException,
DdlException;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
similarity index 64%
rename from
fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
rename to
fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 6c8f916a5e..a4e7bfae2f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -17,17 +17,32 @@
package org.apache.doris.planner.external;
+import org.apache.doris.planner.Split;
+
import lombok.Data;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
@Data
-public class HiveSplit extends FileSplit {
- public HiveSplit() {}
+public class FileSplit extends Split {
+ protected Path path;
+ protected long start;
+ protected long length;
+ protected TableFormatType tableFormatType;
+
+ public FileSplit() {}
- public HiveSplit(Path file, long start, long length, String[] hosts) {
- super(file, start, length, hosts);
+ public FileSplit(Path path, long start, long length, String[] hosts) {
+ this.path = path;
+ this.start = start;
+ this.length = length;
+ this.hosts = hosts;
}
- protected TableFormatType tableFormatType;
+ public String[] getHosts() {
+ if (this.hosts == null) {
+ return new String[]{};
+ } else {
+ return this.hosts;
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
index 83a6d3d49c..e574aeb9d2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplitStrategy.java
@@ -19,8 +19,6 @@ package org.apache.doris.planner.external;
import org.apache.doris.common.Config;
-import org.apache.hadoop.mapred.FileSplit;
-
public class FileSplitStrategy {
private long totalSplitSize;
private int splitNum;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index f538bf3c3e..5b0baf93ed 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -18,29 +18,18 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
-import org.apache.doris.catalog.ListPartitionItem;
-import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.Util;
-import org.apache.doris.datasource.HMSExternalCatalog;
-import org.apache.doris.datasource.hive.HiveMetaStoreCache;
-import org.apache.doris.datasource.hive.HiveMetaStoreCache.HivePartitionValues;
-import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.ListPartitionPrunerV2;
import
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
@@ -49,17 +38,12 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -86,6 +70,7 @@ public class HiveScanProvider extends HMSTableScanProvider {
this.hmsTable = hmsTable;
this.desc = desc;
this.columnNameToRange = columnNameToRange;
+ this.splitter = new HiveSplitter(hmsTable, columnNameToRange);
}
@Override
@@ -138,84 +123,12 @@ public class HiveScanProvider extends
HMSTableScanProvider {
return hmsTable.getMetastoreUri();
}
- @Override
- public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
- long start = System.currentTimeMillis();
- try {
- HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
- .getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
- // 1. get ListPartitionItems from cache
- HivePartitionValues hivePartitionValues = null;
- List<Type> partitionColumnTypes =
hmsTable.getPartitionColumnTypes();
- if (!partitionColumnTypes.isEmpty()) {
- hivePartitionValues =
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
- partitionColumnTypes);
- }
-
- List<InputSplit> allFiles = Lists.newArrayList();
- if (hivePartitionValues != null) {
- // 2. prune partitions by expr
- Map<Long, PartitionItem> idToPartitionItem =
hivePartitionValues.getIdToPartitionItem();
- this.totalPartitionNum = idToPartitionItem.size();
- ListPartitionPrunerV2 pruner = new
ListPartitionPrunerV2(idToPartitionItem,
- hmsTable.getPartitionColumns(), columnNameToRange,
- hivePartitionValues.getUidToPartitionRange(),
- hivePartitionValues.getRangeToId(),
- hivePartitionValues.getSingleColumnRangeMap(),
- true);
- Collection<Long> filteredPartitionIds = pruner.prune();
- this.readPartitionNum = filteredPartitionIds.size();
- LOG.debug("hive partition fetch and prune for table {}.{}
cost: {} ms",
- hmsTable.getDbName(), hmsTable.getName(),
(System.currentTimeMillis() - start));
-
- // 3. get partitions from cache
- List<List<String>> partitionValuesList =
Lists.newArrayListWithCapacity(filteredPartitionIds.size());
- for (Long id : filteredPartitionIds) {
- ListPartitionItem listPartitionItem = (ListPartitionItem)
idToPartitionItem.get(id);
-
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
- }
- List<HivePartition> partitions =
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
- partitionValuesList);
- // 4. get all files of partitions
- getFileSplitByPartitions(cache, partitions, allFiles);
- } else {
- // unpartitioned table, create a dummy partition to save
location and inputformat,
- // so that we can unify the interface.
- HivePartition dummyPartition = new
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
- hmsTable.getRemoteTable().getSd().getLocation(), null);
- getFileSplitByPartitions(cache,
Lists.newArrayList(dummyPartition), allFiles);
- this.totalPartitionNum = 1;
- this.readPartitionNum = 1;
- }
- LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
- allFiles.size(), hmsTable.getDbName(), hmsTable.getName(),
(System.currentTimeMillis() - start));
- return allFiles;
- } catch (Throwable t) {
- LOG.warn("get file split failed for table: {}",
hmsTable.getName(), t);
- throw new UserException(
- "get file split failed for table: " + hmsTable.getName() +
", err: " + Util.getRootCauseMessage(t),
- t);
- }
- }
-
- private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
- List<InputSplit> allFiles) {
- List<InputSplit> files = cache.getFilesByPartitions(partitions);
- if (LOG.isDebugEnabled()) {
- LOG.debug("get #{} files from #{} partitions: {}", files.size(),
partitions.size(),
- Joiner.on(",")
- .join(files.stream().limit(10).map(f ->
((FileSplit) f).getPath())
- .collect(Collectors.toList())));
- }
- allFiles.addAll(files);
- }
-
public int getTotalPartitionNum() {
- return totalPartitionNum;
+ return ((HiveSplitter) splitter).getTotalPartitionNum();
}
public int getReadPartitionNum() {
- return readPartitionNum;
+ return ((HiveSplitter) splitter).getReadPartitionNum();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
new file mode 100644
index 0000000000..a49935b9ee
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplitter.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.HiveMetaStoreCache;
+import org.apache.doris.datasource.hive.HivePartition;
+import org.apache.doris.planner.ColumnRange;
+import org.apache.doris.planner.ListPartitionPrunerV2;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HiveSplitter implements Splitter {
+
+ private static final Logger LOG = LogManager.getLogger(HiveSplitter.class);
+
+ private HMSExternalTable hmsTable;
+ private Map<String, ColumnRange> columnNameToRange;
+ private int totalPartitionNum = 0;
+ private int readPartitionNum = 0;
+
+ public HiveSplitter(HMSExternalTable hmsTable, Map<String, ColumnRange>
columnNameToRange) {
+ this.hmsTable = hmsTable;
+ this.columnNameToRange = columnNameToRange;
+ }
+
+ @Override
+ public List<Split> getSplits(List<Expr> exprs) throws UserException {
+ long start = System.currentTimeMillis();
+ try {
+ HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMetaStoreCache((HMSExternalCatalog)
hmsTable.getCatalog());
+ // 1. get ListPartitionItems from cache
+ HiveMetaStoreCache.HivePartitionValues hivePartitionValues = null;
+ List<Type> partitionColumnTypes =
hmsTable.getPartitionColumnTypes();
+ if (!partitionColumnTypes.isEmpty()) {
+ hivePartitionValues =
cache.getPartitionValues(hmsTable.getDbName(), hmsTable.getName(),
+ partitionColumnTypes);
+ }
+
+ List<Split> allFiles = Lists.newArrayList();
+ if (hivePartitionValues != null) {
+ // 2. prune partitions by expr
+ Map<Long, PartitionItem> idToPartitionItem =
hivePartitionValues.getIdToPartitionItem();
+ this.totalPartitionNum = idToPartitionItem.size();
+ ListPartitionPrunerV2 pruner = new
ListPartitionPrunerV2(idToPartitionItem,
+ hmsTable.getPartitionColumns(), columnNameToRange,
+ hivePartitionValues.getUidToPartitionRange(),
+ hivePartitionValues.getRangeToId(),
+ hivePartitionValues.getSingleColumnRangeMap(),
+ true);
+ Collection<Long> filteredPartitionIds = pruner.prune();
+ this.readPartitionNum = filteredPartitionIds.size();
+ LOG.debug("hive partition fetch and prune for table {}.{}
cost: {} ms",
+ hmsTable.getDbName(), hmsTable.getName(),
(System.currentTimeMillis() - start));
+
+ // 3. get partitions from cache
+ List<List<String>> partitionValuesList =
Lists.newArrayListWithCapacity(filteredPartitionIds.size());
+ for (Long id : filteredPartitionIds) {
+ ListPartitionItem listPartitionItem = (ListPartitionItem)
idToPartitionItem.get(id);
+
partitionValuesList.add(listPartitionItem.getItems().get(0).getPartitionValuesAsStringList());
+ }
+ List<HivePartition> partitions =
cache.getAllPartitions(hmsTable.getDbName(), hmsTable.getName(),
+ partitionValuesList);
+ // 4. get all files of partitions
+ getFileSplitByPartitions(cache, partitions, allFiles);
+ } else {
+ // unpartitioned table, create a dummy partition to save
location and inputformat,
+ // so that we can unify the interface.
+ HivePartition dummyPartition = new
HivePartition(hmsTable.getRemoteTable().getSd().getInputFormat(),
+ hmsTable.getRemoteTable().getSd().getLocation(), null);
+ getFileSplitByPartitions(cache,
Lists.newArrayList(dummyPartition), allFiles);
+ this.totalPartitionNum = 1;
+ this.readPartitionNum = 1;
+ }
+ LOG.debug("get #{} files for table: {}.{}, cost: {} ms",
+ allFiles.size(), hmsTable.getDbName(), hmsTable.getName(),
(System.currentTimeMillis() - start));
+ return allFiles;
+ } catch (Throwable t) {
+ LOG.warn("get file split failed for table: {}",
hmsTable.getName(), t);
+ throw new UserException(
+ "get file split failed for table: " + hmsTable.getName() + ",
err: " + Util.getRootCauseMessage(t),
+ t);
+ }
+ }
+
+ private void getFileSplitByPartitions(HiveMetaStoreCache cache,
List<HivePartition> partitions,
+ List<Split> allFiles) {
+ List<InputSplit> files = cache.getFilesByPartitions(partitions);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get #{} files from #{} partitions: {}", files.size(),
partitions.size(),
+ Joiner.on(",")
+ .join(files.stream().limit(10).map(f -> ((FileSplit)
f).getPath())
+ .collect(Collectors.toList())));
+ }
+ allFiles.addAll(files.stream().map(file -> {
+ FileSplit fs = (FileSplit) file;
+ org.apache.doris.planner.external.FileSplit split = new
org.apache.doris.planner.external.FileSplit();
+ split.setPath(fs.getPath());
+ split.setStart(fs.getStart());
+ // file size of orc files is not correct get by
FileSplit.getLength(),
+ // broker reader needs correct file size
+ if (fs instanceof OrcSplit) {
+ split.setLength(((OrcSplit) fs).getFileLength());
+ } else {
+ split.setLength(fs.getLength());
+ }
+ return split;
+ }).collect(Collectors.toList()));
+ }
+
+ public int getTotalPartitionNum() {
+ return totalPartitionNum;
+ }
+
+ public int getReadPartitionNum() {
+ return readPartitionNum;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
new file mode 100644
index 0000000000..b595e95cc3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplitter.java
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TableSnapshot;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.external.iceberg.util.IcebergUtils;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
+import org.apache.doris.planner.external.iceberg.IcebergDeleteFileFilter;
+import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
+import org.apache.doris.planner.external.iceberg.IcebergSource;
+import org.apache.doris.planner.external.iceberg.IcebergSplit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.types.Conversions;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class IcebergSplitter implements Splitter {
+ private static final Logger LOG =
LogManager.getLogger(IcebergSplitter.class);
+
+ private final IcebergSource icebergSource;
+ private final Analyzer analyzer;
+
+ public IcebergSplitter(IcebergSource icebergSource, Analyzer analyzer) {
+ this.icebergSource = icebergSource;
+ this.analyzer = analyzer;
+ }
+
+ @Override
+ public List<Split> getSplits(List<Expr> exprs) throws UserException {
+ List<Expression> expressions = new ArrayList<>();
+ org.apache.iceberg.Table table = icebergSource.getIcebergTable();
+ for (Expr conjunct : exprs) {
+ Expression expression =
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
+ if (expression != null) {
+ expressions.add(expression);
+ }
+ }
+ TableScan scan = table.newScan();
+ TableSnapshot tableSnapshot =
icebergSource.getDesc().getRef().getTableSnapshot();
+ if (tableSnapshot != null) {
+ TableSnapshot.VersionType type = tableSnapshot.getType();
+ try {
+ if (type == TableSnapshot.VersionType.VERSION) {
+ scan = scan.useSnapshot(tableSnapshot.getVersion());
+ } else {
+ long snapshotId =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+ scan =
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
+ }
+ } catch (IllegalArgumentException e) {
+ throw new UserException(e);
+ }
+ }
+ for (Expression predicate : expressions) {
+ scan = scan.filter(predicate);
+ }
+ List<Split> splits = new ArrayList<>();
+ int formatVersion = ((BaseTable)
table).operations().current().formatVersion();
+ for (FileScanTask task : scan.planFiles()) {
+ for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
+ String dataFilePath = splitTask.file().path().toString();
+ IcebergSplit split = new IcebergSplit(new Path(dataFilePath),
splitTask.start(),
+ splitTask.length(), new String[0]);
+ split.setFormatVersion(formatVersion);
+ if (formatVersion >=
IcebergScanProvider.MIN_DELETE_FILE_SUPPORT_VERSION) {
+
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
+ }
+ split.setTableFormatType(TableFormatType.ICEBERG);
+ split.setAnalyzer(analyzer);
+ splits.add(split);
+ }
+ }
+ return splits;
+ }
+
+ public static long getSnapshotIdAsOfTime(List<HistoryEntry>
historyEntries, long asOfTimestamp) {
+ // find history at or before asOfTimestamp
+ HistoryEntry latestHistory = null;
+ for (HistoryEntry entry : historyEntries) {
+ if (entry.timestampMillis() <= asOfTimestamp) {
+ if (latestHistory == null) {
+ latestHistory = entry;
+ continue;
+ }
+ if (entry.timestampMillis() > latestHistory.timestampMillis())
{
+ latestHistory = entry;
+ }
+ }
+ }
+ if (latestHistory == null) {
+ throw new NotFoundException("No version history at or before "
+ + Instant.ofEpochMilli(asOfTimestamp));
+ }
+ return latestHistory.snapshotId();
+ }
+
+ private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask
spitTask) {
+ List<IcebergDeleteFileFilter> filters = new ArrayList<>();
+ for (DeleteFile delete : spitTask.deletes()) {
+ if (delete.content() == FileContent.POSITION_DELETES) {
+ ByteBuffer lowerBoundBytes =
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+ Optional<Long> positionLowerBound =
Optional.ofNullable(lowerBoundBytes)
+ .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+ ByteBuffer upperBoundBytes =
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
+ Optional<Long> positionUpperBound =
Optional.ofNullable(upperBoundBytes)
+ .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
+ positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L)));
+ } else if (delete.content() == FileContent.EQUALITY_DELETES) {
+ // todo:
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
+ // delete.equalityFieldIds()));
+ throw new IllegalStateException("Don't support equality delete
file");
+ } else {
+ throw new IllegalStateException("Unknown delete content: " +
delete.content());
+ }
+ }
+ return filters;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index d8e644e277..086191e94d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
@@ -51,9 +50,7 @@ import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.hadoop.mapred.InputSplit;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -77,11 +74,6 @@ public class LoadScanProvider implements FileScanProviderIf {
return null;
}
- @Override
- public List<InputSplit> getSplits(List<Expr> exprs) throws IOException,
UserException {
- return null;
- }
-
@Override
public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
return null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 1d25a3cb31..45e48c4ff5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -24,6 +24,8 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
import
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
import org.apache.doris.planner.external.iceberg.IcebergSplit;
@@ -42,13 +44,9 @@ import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Joiner;
-import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -56,6 +54,7 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
public static final Logger LOG =
LogManager.getLogger(QueryScanProvider.class);
private int inputSplitNum = 0;
private long inputFileSize = 0;
+ protected Splitter splitter;
public abstract TFileAttributes getFileAttributes() throws UserException;
@@ -63,93 +62,83 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
public void createScanRangeLocations(ParamCreateContext context,
BackendPolicy backendPolicy,
List<TScanRangeLocations> scanRangeLocations) throws UserException
{
long start = System.currentTimeMillis();
- try {
- List<InputSplit> inputSplits = getSplits(context.conjuncts);
- this.inputSplitNum = inputSplits.size();
- if (inputSplits.isEmpty()) {
- return;
- }
- InputSplit inputSplit = inputSplits.get(0);
- TFileType locationType = getLocationType();
- context.params.setFileType(locationType);
- TFileFormatType fileFormatType = getFileFormatType();
- context.params.setFormatType(getFileFormatType());
- if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN ||
fileFormatType == TFileFormatType.FORMAT_JSON) {
- context.params.setFileAttributes(getFileAttributes());
- }
+ List<Split> inputSplits = splitter.getSplits(context.conjuncts);
+ this.inputSplitNum = inputSplits.size();
+ if (inputSplits.isEmpty()) {
+ return;
+ }
+ FileSplit inputSplit = (FileSplit) inputSplits.get(0);
+ TFileType locationType = getLocationType();
+ context.params.setFileType(locationType);
+ TFileFormatType fileFormatType = getFileFormatType();
+ context.params.setFormatType(getFileFormatType());
+ if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN ||
fileFormatType == TFileFormatType.FORMAT_JSON) {
+ context.params.setFileAttributes(getFileAttributes());
+ }
- // set hdfs params for hdfs file type.
- Map<String, String> locationProperties = getLocationProperties();
- if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
- String fsName = "";
- if (this instanceof TVFScanProvider) {
- fsName = ((TVFScanProvider) this).getFsName();
- } else {
- String fullPath = ((FileSplit)
inputSplit).getPath().toUri().toString();
- String filePath = ((FileSplit)
inputSplit).getPath().toUri().getPath();
- // eg:
- // hdfs://namenode
- // s3://buckets
- fsName = fullPath.replace(filePath, "");
- }
- THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
- tHdfsParams.setFsName(fsName);
- context.params.setHdfsParams(tHdfsParams);
-
- if (locationType == TFileType.FILE_BROKER) {
- FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
- if (broker == null) {
- throw new UserException("No alive broker.");
- }
- context.params.addToBrokerAddresses(new
TNetworkAddress(broker.ip, broker.port));
+ // set hdfs params for hdfs file type.
+ Map<String, String> locationProperties = getLocationProperties();
+ if (locationType == TFileType.FILE_HDFS || locationType ==
TFileType.FILE_BROKER) {
+ String fsName = "";
+ if (this instanceof TVFScanProvider) {
+ fsName = ((TVFScanProvider) this).getFsName();
+ } else {
+ String fullPath = inputSplit.getPath().toUri().toString();
+ String filePath = inputSplit.getPath().toUri().getPath();
+ // eg:
+ // hdfs://namenode
+ // s3://buckets
+ fsName = fullPath.replace(filePath, "");
+ }
+ THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
+ tHdfsParams.setFsName(fsName);
+ context.params.setHdfsParams(tHdfsParams);
+
+ if (locationType == TFileType.FILE_BROKER) {
+ FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
+ if (broker == null) {
+ throw new UserException("No alive broker.");
}
- } else if (locationType == TFileType.FILE_S3) {
- context.params.setProperties(locationProperties);
+ context.params.addToBrokerAddresses(new
TNetworkAddress(broker.ip, broker.port));
}
- TScanRangeLocations curLocations = newLocations(context.params,
backendPolicy);
+ } else if (locationType == TFileType.FILE_S3) {
+ context.params.setProperties(locationProperties);
+ }
+ TScanRangeLocations curLocations = newLocations(context.params,
backendPolicy);
- FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
+ FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
- for (InputSplit split : inputSplits) {
- FileSplit fileSplit = (FileSplit) split;
- List<String> pathPartitionKeys = getPathPartitionKeys();
- List<String> partitionValuesFromPath =
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
- pathPartitionKeys, false);
+ for (Split split : inputSplits) {
+ FileSplit fileSplit = (FileSplit) split;
+ List<String> pathPartitionKeys = getPathPartitionKeys();
+ List<String> partitionValuesFromPath =
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
+ pathPartitionKeys, false);
- TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys);
- // external data lake table
- if (split instanceof IcebergSplit) {
- IcebergScanProvider.setIcebergParams(rangeDesc,
(IcebergSplit) split);
- }
+ TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit,
partitionValuesFromPath, pathPartitionKeys);
+ // external data lake table
+ if (fileSplit instanceof IcebergSplit) {
+ IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit)
fileSplit);
+ }
- // file size of orc files is not correct get by
FileSplit.getLength(),
- // broker reader needs correct file size
- if (locationType == TFileType.FILE_BROKER && fileFormatType ==
TFileFormatType.FORMAT_ORC) {
- rangeDesc.setFileSize(((OrcSplit)
fileSplit).getFileLength());
- }
+
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
+ LOG.debug("assign to backend {} with table split: {} ({}, {}),
location: {}",
+ curLocations.getLocations().get(0).getBackendId(),
fileSplit.getPath(), fileSplit.getStart(),
+ fileSplit.getLength(),
Joiner.on("|").join(fileSplit.getHosts()));
-
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
- LOG.debug("assign to backend {} with table split: {} ({}, {}),
location: {}",
- curLocations.getLocations().get(0).getBackendId(),
fileSplit.getPath(), fileSplit.getStart(),
- fileSplit.getLength(),
Joiner.on("|").join(split.getLocations()));
-
- fileSplitStrategy.update(fileSplit);
- // Add a new location when it's can be split
- if (fileSplitStrategy.hasNext()) {
- scanRangeLocations.add(curLocations);
- curLocations = newLocations(context.params, backendPolicy);
- fileSplitStrategy.next();
- }
- this.inputFileSize += fileSplit.getLength();
- }
- if
(curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize()
> 0) {
+ fileSplitStrategy.update(fileSplit);
+ // Add a new location when it's can be split
+ if (fileSplitStrategy.hasNext()) {
scanRangeLocations.add(curLocations);
+ curLocations = newLocations(context.params, backendPolicy);
+ fileSplitStrategy.next();
}
- LOG.debug("create #{} ScanRangeLocations cost: {} ms",
- scanRangeLocations.size(), (System.currentTimeMillis() -
start));
- } catch (IOException e) {
- throw new UserException(e);
+ this.inputFileSize += fileSplit.getLength();
+ }
+ if
(curLocations.getScanRange().getExtScanRange().getFileScanRange().getRangesSize()
> 0) {
+ scanRangeLocations.add(curLocations);
}
+ LOG.debug("create #{} ScanRangeLocations cost: {} ms",
+ scanRangeLocations.size(), (System.currentTimeMillis() -
start));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
index 954d271a94..48365a7656 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
@@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
@@ -30,7 +29,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.load.BrokerFileGroup;
import
org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
-import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileScanRangeParams;
@@ -38,11 +36,7 @@ import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -56,6 +50,7 @@ public class TVFScanProvider extends QueryScanProvider {
this.tvfTable = tvfTable;
this.desc = desc;
this.tableValuedFunction = tableValuedFunction;
+ this.splitter = new TVFSplitter(tableValuedFunction);
}
public String getFsName() {
@@ -80,18 +75,6 @@ public class TVFScanProvider extends QueryScanProvider {
return tableValuedFunction.getTFileType();
}
- @Override
- public List<InputSplit> getSplits(List<Expr> exprs) throws IOException,
UserException {
- List<InputSplit> splits = Lists.newArrayList();
- List<TBrokerFileStatus> fileStatuses =
tableValuedFunction.getFileStatuses();
- for (TBrokerFileStatus fileStatus : fileStatuses) {
- Path path = new Path(fileStatus.getPath());
- FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getSize(),
new String[0]);
- splits.add(fileSplit);
- }
- return splits;
- }
-
@Override
public Map<String, String> getLocationProperties() throws
MetaNotFoundException, DdlException {
return tableValuedFunction.getLocationProperties();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
new file mode 100644
index 0000000000..d3234c977f
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFSplitter.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.UserException;
+import org.apache.doris.planner.Split;
+import org.apache.doris.planner.Splitter;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.thrift.TBrokerFileStatus;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class TVFSplitter implements Splitter {
+
+ private static final Logger LOG = LogManager.getLogger(TVFSplitter.class);
+
+ private ExternalFileTableValuedFunction tableValuedFunction;
+
+ public TVFSplitter(ExternalFileTableValuedFunction tableValuedFunction) {
+ this.tableValuedFunction = tableValuedFunction;
+ }
+
+ @Override
+ public List<Split> getSplits(List<Expr> exprs) throws UserException {
+ List<Split> splits = Lists.newArrayList();
+ List<TBrokerFileStatus> fileStatuses =
tableValuedFunction.getFileStatuses();
+ for (TBrokerFileStatus fileStatus : fileStatuses) {
+ Path path = new Path(fileStatus.getPath());
+ Split split = new FileSplit(path, 0, fileStatus.getSize(), new
String[0]);
+ splits.add(split);
+ }
+ return splits;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 794283de80..6fc5d69544 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -18,6 +18,7 @@
package org.apache.doris.planner.external;
public enum TableFormatType {
+ HIVE("hive"),
ICEBERG("iceberg"),
HUDI("hudi");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
index 483432b798..eb565638e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
@@ -18,18 +18,14 @@
package org.apache.doris.planner.external.iceberg;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.planner.external.ExternalFileScanNode;
+import org.apache.doris.planner.external.IcebergSplitter;
import org.apache.doris.planner.external.QueryScanProvider;
-import org.apache.doris.planner.external.TableFormatType;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
@@ -38,26 +34,11 @@ import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.HistoryEntry;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.types.Conversions;
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
@@ -66,17 +47,17 @@ import java.util.stream.Collectors;
*/
public class IcebergScanProvider extends QueryScanProvider {
- private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+ public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
private final Analyzer analyzer;
private final IcebergSource icebergSource;
public IcebergScanProvider(IcebergSource icebergSource, Analyzer analyzer)
{
this.icebergSource = icebergSource;
this.analyzer = analyzer;
+ this.splitter = new IcebergSplitter(icebergSource, analyzer);
}
- public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit)
- throws UserException {
+ public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
@@ -139,97 +120,6 @@ public class IcebergScanProvider extends QueryScanProvider
{
+ " for hms table " + icebergSource.getIcebergTable().name());
}
- @Override
- public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
- List<Expression> expressions = new ArrayList<>();
- org.apache.iceberg.Table table = icebergSource.getIcebergTable();
- for (Expr conjunct : exprs) {
- Expression expression =
IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
- if (expression != null) {
- expressions.add(expression);
- }
- }
- TableScan scan = table.newScan();
- TableSnapshot tableSnapshot =
icebergSource.getDesc().getRef().getTableSnapshot();
- if (tableSnapshot != null) {
- TableSnapshot.VersionType type = tableSnapshot.getType();
- try {
- if (type == TableSnapshot.VersionType.VERSION) {
- scan = scan.useSnapshot(tableSnapshot.getVersion());
- } else {
- long snapshotId =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
- scan =
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
- }
- } catch (IllegalArgumentException e) {
- throw new UserException(e);
- }
- }
- for (Expression predicate : expressions) {
- scan = scan.filter(predicate);
- }
- List<InputSplit> splits = new ArrayList<>();
- int formatVersion = ((BaseTable)
table).operations().current().formatVersion();
- for (FileScanTask task : scan.planFiles()) {
- for (FileScanTask splitTask : task.split(128 * 1024 * 1024)) {
- String dataFilePath = splitTask.file().path().toString();
- IcebergSplit split = new IcebergSplit(new Path(dataFilePath),
splitTask.start(),
- splitTask.length(), new String[0]);
- split.setFormatVersion(formatVersion);
- if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
-
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
- }
- split.setTableFormatType(TableFormatType.ICEBERG);
- split.setAnalyzer(analyzer);
- splits.add(split);
- }
- }
- return splits;
- }
-
- public static long getSnapshotIdAsOfTime(List<HistoryEntry>
historyEntries, long asOfTimestamp) {
- // find history at or before asOfTimestamp
- HistoryEntry latestHistory = null;
- for (HistoryEntry entry : historyEntries) {
- if (entry.timestampMillis() <= asOfTimestamp) {
- if (latestHistory == null) {
- latestHistory = entry;
- continue;
- }
- if (entry.timestampMillis() > latestHistory.timestampMillis())
{
- latestHistory = entry;
- }
- }
- }
- if (latestHistory == null) {
- throw new NotFoundException("No version history at or before "
- + Instant.ofEpochMilli(asOfTimestamp));
- }
- return latestHistory.snapshotId();
- }
-
- private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask
spitTask) {
- List<IcebergDeleteFileFilter> filters = new ArrayList<>();
- for (DeleteFile delete : spitTask.deletes()) {
- if (delete.content() == FileContent.POSITION_DELETES) {
- ByteBuffer lowerBoundBytes =
delete.lowerBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
- Optional<Long> positionLowerBound =
Optional.ofNullable(lowerBoundBytes)
- .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
- ByteBuffer upperBoundBytes =
delete.upperBounds().get(MetadataColumns.DELETE_FILE_POS.fieldId());
- Optional<Long> positionUpperBound =
Optional.ofNullable(upperBoundBytes)
- .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
-
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
- positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L)));
- } else if (delete.content() == FileContent.EQUALITY_DELETES) {
- // todo:
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
- // delete.equalityFieldIds()));
- throw new IllegalStateException("Don't support equality delete
file");
- } else {
- throw new IllegalStateException("Unknown delete content: " +
delete.content());
- }
- }
- return filters;
- }
-
@Override
public List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException {
return
icebergSource.getIcebergTable().spec().fields().stream().map(PartitionField::name)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index a82c99b04a..431652b894 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -18,7 +18,7 @@
package org.apache.doris.planner.external.iceberg;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.planner.external.HiveSplit;
+import org.apache.doris.planner.external.FileSplit;
import lombok.Data;
import org.apache.hadoop.fs.Path;
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
-public class IcebergSplit extends HiveSplit {
+public class IcebergSplit extends FileSplit {
public IcebergSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]