This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 70311b45bf3 [fix](multi-catalog)support the max compute partition
prune (#27154) (#27902)
70311b45bf3 is described below
commit 70311b45bf39000c2d19979008ca3371a6c4ccf8
Author: slothever <[email protected]>
AuthorDate: Sun Dec 3 10:18:54 2023 +0800
[fix](multi-catalog)support the max compute partition prune (#27154)
(#27902)
backport #27154
---
be/src/runtime/descriptors.cpp | 1 -
be/src/runtime/descriptors.h | 2 -
.../exec/format/table/max_compute_jni_reader.cpp | 9 +-
.../vec/exec/format/table/max_compute_jni_reader.h | 12 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 12 +-
.../doris/maxcompute/MaxComputeJniScanner.java | 32 +++-
.../apache/doris/analysis/ShowPartitionsStmt.java | 14 +-
.../catalog/external/MaxComputeExternalTable.java | 177 ++++++++++++---------
.../doris/datasource/ExternalMetaCacheMgr.java | 10 ++
.../doris/datasource/MaxComputeCacheKey.java | 65 ++++++++
.../datasource/MaxComputeExternalCatalog.java | 61 +++++--
.../doris/datasource/MaxComputeMetadataCache.java | 90 +++++++++++
.../datasource/MaxComputeMetadataCacheMgr.java | 64 ++++++++
.../datasource/hive/PooledHiveMetaStoreClient.java | 9 +-
.../doris/planner/external/FileQueryScanNode.java | 2 +
.../doris/planner/external/MaxComputeScanNode.java | 121 ++++++++++----
.../{TableFormatType.java => MaxComputeSplit.java} | 26 +--
.../doris/planner/external/TableFormatType.java | 1 +
.../planner/external/TablePartitionValues.java | 9 +-
.../hudi/HudiCachedPartitionProcessor.java | 8 +-
.../java/org/apache/doris/qe/ShowExecutor.java | 37 ++++-
gensrc/thrift/Descriptors.thrift | 1 -
gensrc/thrift/PlanNodes.thrift | 4 +
.../test_external_catalog_maxcompute.out | 46 +++++-
.../test_external_catalog_maxcompute.groovy | 17 +-
25 files changed, 662 insertions(+), 168 deletions(-)
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index 15a4b773264..f8125588ae5 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -192,7 +192,6 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const
TTableDescriptor& tde
_table(tdesc.mcTable.table),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
- _partition_spec(tdesc.mcTable.partition_spec),
_public_access(tdesc.mcTable.public_access) {}
MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 0d765854ae6..6808523ab58 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -237,7 +237,6 @@ public:
const std::string table() const { return _table; }
const std::string access_key() const { return _access_key; }
const std::string secret_key() const { return _secret_key; }
- const std::string partition_spec() const { return _partition_spec; }
const std::string public_access() const { return _public_access; }
private:
@@ -246,7 +245,6 @@ private:
std::string _table;
std::string _access_key;
std::string _secret_key;
- std::string _partition_spec;
std::string _public_access;
};
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index 34db6a1df4d..7ba714eedd5 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -38,10 +38,15 @@ class Block;
namespace doris::vectorized {
MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor*
mc_desc,
+ const TMaxComputeFileDesc&
max_compute_params,
const std::vector<SlotDescriptor*>&
file_slot_descs,
const TFileRangeDesc& range,
RuntimeState* state,
RuntimeProfile* profile)
- : _file_slot_descs(file_slot_descs), _range(range), _state(state),
_profile(profile) {
+ : _max_compute_params(max_compute_params),
+ _file_slot_descs(file_slot_descs),
+ _range(range),
+ _state(state),
+ _profile(profile) {
_table_desc = mc_desc;
std::ostringstream required_fields;
std::ostringstream columns_types;
@@ -64,7 +69,7 @@ MaxComputeJniReader::MaxComputeJniReader(const
MaxComputeTableDescriptor* mc_des
{"access_key",
_table_desc->access_key()},
{"secret_key",
_table_desc->secret_key()},
{"project", _table_desc->project()},
- {"partition_spec",
_table_desc->partition_spec()},
+ {"partition_spec",
_max_compute_params.partition_spec},
{"table", _table_desc->table()},
{"public_access",
_table_desc->public_access()},
{"start_offset",
std::to_string(_range.start_offset)},
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h
b/be/src/vec/exec/format/table/max_compute_jni_reader.h
index 0b3c809c502..e027678148f 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.h
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h
@@ -54,6 +54,7 @@ class MaxComputeJniReader : public GenericReader {
public:
MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc,
+ const TMaxComputeFileDesc& max_compute_params,
const std::vector<SlotDescriptor*>& file_slot_descs,
const TFileRangeDesc& range, RuntimeState* state,
RuntimeProfile* profile);
@@ -68,13 +69,14 @@ public:
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
private:
- const MaxComputeTableDescriptor* _table_desc;
+ const MaxComputeTableDescriptor* _table_desc = nullptr;
+ const TMaxComputeFileDesc& _max_compute_params;
const std::vector<SlotDescriptor*>& _file_slot_descs;
const TFileRangeDesc& _range;
- RuntimeState* _state;
- RuntimeProfile* _profile;
- std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
- std::unique_ptr<JniConnector> _jni_connector;
+ RuntimeState* _state = nullptr;
+ RuntimeProfile* _profile = nullptr;
+ std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range = nullptr;
+ std::unique_ptr<JniConnector> _jni_connector = nullptr;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index fd77b084bb9..5924aa3963f 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -686,13 +686,13 @@ Status VFileScanner::_get_next_reader() {
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
- if (_real_tuple_desc->table_desc()->table_type() ==
- ::doris::TTableType::type::MAX_COMPUTE_TABLE) {
- const MaxComputeTableDescriptor* mc_desc =
- static_cast<const MaxComputeTableDescriptor*>(
- _real_tuple_desc->table_desc());
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "max_compute") {
+ const auto* mc_desc = static_cast<const
MaxComputeTableDescriptor*>(
+ _real_tuple_desc->table_desc());
std::unique_ptr<MaxComputeJniReader> mc_reader =
MaxComputeJniReader::create_unique(
- mc_desc, _file_slot_descs, range, _state, _profile);
+ mc_desc, range.table_format_params.max_compute_params,
_file_slot_descs,
+ range, _state, _profile);
init_status = mc_reader->init_reader(_colname_to_value_range);
_cur_reader = std::move(mc_reader);
} else if (range.__isset.table_format_params &&
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 0d80546cdfb..f4a8a9c8fc6 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -61,11 +61,11 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
- private final RootAllocator arrowAllocator = new
RootAllocator(Integer.MAX_VALUE);
private final Map<String, MaxComputeTableScan> tableScans = new
ConcurrentHashMap<>();
private final String region;
private final String project;
private final String table;
+ private RootAllocator arrowAllocator;
private PartitionSpec partitionSpec;
private Set<String> partitionColumns;
private MaxComputeTableScan curTableScan;
@@ -171,9 +171,14 @@ public class MaxComputeJniScanner extends JniScanner {
partitionColumns =
session.getSchema().getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toSet());
- List<Column> maxComputeColumns = new ArrayList<>(readColumns);
- maxComputeColumns.removeIf(e ->
partitionColumns.contains(e.getName()));
- curReader = session.openArrowRecordReader(start, totalRows,
maxComputeColumns, arrowAllocator);
+ List<Column> pushDownColumns = new ArrayList<>(readColumns);
+ pushDownColumns.removeIf(e ->
partitionColumns.contains(e.getName()));
+ if (pushDownColumns.isEmpty() && !partitionColumns.isEmpty()) {
+ // query columns required non-null, when query partition table
+ pushDownColumns.add(session.getSchema().getColumn(0));
+ }
+ arrowAllocator = new RootAllocator(Integer.MAX_VALUE);
+ curReader = session.openArrowRecordReader(start, totalRows,
pushDownColumns, arrowAllocator);
remainBatchRows = totalRows;
} catch (TunnelException e) {
if (retryCount > 0 && e.getErrorMsg().contains("TableModified")) {
@@ -254,7 +259,8 @@ public class MaxComputeJniScanner extends JniScanner {
startOffset = -1;
splitSize = -1;
if (curReader != null) {
- arrowAllocator.releaseBytes(arrowAllocator.getAllocatedMemory());
+ arrowAllocator.close();
+ arrowAllocator = null;
curReader.close();
curReader = null;
}
@@ -279,15 +285,25 @@ public class MaxComputeJniScanner extends JniScanner {
private int readVectors(int expectedRows) throws IOException {
VectorSchemaRoot batch;
int curReadRows = 0;
- while (curReadRows < expectedRows && (batch = curReader.read()) !=
null) {
+ while (curReadRows < expectedRows) {
+ batch = curReader.read();
+ if (batch == null) {
+ break;
+ }
try {
List<FieldVector> fieldVectors = batch.getFieldVectors();
int batchRows = 0;
for (FieldVector column : fieldVectors) {
+ Integer readColumnId =
readColumnsToId.get(column.getName());
+ if (readColumnId == null) {
+ // use for partition if no column need to read.
+ batchRows = column.getValueCount();
+ continue;
+ }
columnValue.reset(column);
batchRows = column.getValueCount();
for (int j = 0; j < batchRows; j++) {
- appendData(readColumnsToId.get(column.getName()),
columnValue);
+ appendData(readColumnId, columnValue);
}
}
if (partitionSpec != null) {
@@ -303,6 +319,8 @@ public class MaxComputeJniScanner extends JniScanner {
}
}
curReadRows += batchRows;
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to read arrow data, reason: "
+ e.getMessage(), e);
} finally {
batch.close();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java
index f6e9b06e0b1..dc1d360f290 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPartitionsStmt.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
+import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@@ -37,6 +38,7 @@ import org.apache.doris.common.proc.ProcService;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
@@ -126,7 +128,7 @@ public class ShowPartitionsStmt extends ShowStmt {
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrMetaException(tblName,
Table.TableType.OLAP, TableType.MATERIALIZED_VIEW,
- TableType.HMS_EXTERNAL_TABLE);
+ TableType.HMS_EXTERNAL_TABLE,
TableType.MAX_COMPUTE_EXTERNAL_TABLE);
if (table instanceof HMSExternalTable) {
if (((HMSExternalTable) table).isView()) {
@@ -138,6 +140,13 @@ public class ShowPartitionsStmt extends ShowStmt {
return;
}
+ if (table instanceof MaxComputeExternalTable) {
+ if (((MaxComputeExternalTable)
table).getOdpsTable().getPartitions().isEmpty()) {
+ throw new AnalysisException("Table " + tblName + " is not a
partitioned table");
+ }
+ return;
+ }
+
table.readLock();
try {
// build proc path
@@ -170,7 +179,8 @@ public class ShowPartitionsStmt extends ShowStmt {
}
// disallow unsupported catalog
- if (!(catalog.isInternalCatalog() || catalog instanceof
HMSExternalCatalog)) {
+ if (!(catalog.isInternalCatalog() || catalog instanceof
HMSExternalCatalog
+ || catalog instanceof MaxComputeExternalCatalog)) {
throw new AnalysisException(String.format("Catalog of type '%s' is
not allowed in ShowPartitionsStmt",
catalog.getType()));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
index 3c2f3bada03..5c25cf6cce0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
@@ -17,25 +17,25 @@
package org.apache.doris.catalog.external;
-import org.apache.doris.analysis.BinaryPredicate;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.InPredicate;
-import org.apache.doris.analysis.Predicate;
-import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.MaxComputeCacheKey;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
+import org.apache.doris.datasource.MaxComputeMetadataCache;
+import org.apache.doris.planner.external.TablePartitionValues;
import org.apache.doris.thrift.TMCTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Table;
+import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.ArrayTypeInfo;
import com.aliyun.odps.type.CharTypeInfo;
import com.aliyun.odps.type.DecimalTypeInfo;
@@ -43,17 +43,15 @@ import com.aliyun.odps.type.MapTypeInfo;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.VarcharTypeInfo;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
-import java.util.StringJoiner;
+import java.util.stream.Collectors;
/**
* MaxCompute external table.
@@ -61,8 +59,9 @@ import java.util.StringJoiner;
public class MaxComputeExternalTable extends ExternalTable {
private Table odpsTable;
- private Set<String> partitionKeys;
- private String partitionSpec;
+ private List<String> partitionSpecs;
+ private Map<String, Column> partitionNameToColumns;
+ private List<Type> partitionTypes;
public MaxComputeExternalTable(long id, String name, String dbName,
MaxComputeExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
@@ -73,12 +72,80 @@ public class MaxComputeExternalTable extends ExternalTable {
super.makeSureInitialized();
if (!objectCreated) {
odpsTable = ((MaxComputeExternalCatalog)
catalog).getClient().tables().get(name);
+ initTablePartitions();
objectCreated = true;
}
}
+ public long getTotalRows() throws TunnelException {
+ // use for non-partitioned table
+ // partition table will read the entire partition on FE so get total
rows is unnecessary.
+ makeSureInitialized();
+ MaxComputeMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMaxComputeMetadataCache(catalog.getId());
+ MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog)
catalog);
+ return metadataCache.getCachedRowCount(dbName, name, null, () ->
mcCatalog.getTableTunnel()
+ .getDownloadSession(dbName, name, null)
+ .getRecordCount());
+ }
+
+ @Override
+ public Set<String> getPartitionNames() {
+ makeSureInitialized();
+ return partitionNameToColumns.keySet();
+ }
+
+ public List<Column> getPartitionColumns() {
+ makeSureInitialized();
+ return new ArrayList<>(partitionNameToColumns.values());
+ }
+
+ public TablePartitionValues getPartitionValues() {
+ makeSureInitialized();
+ // Make sure to call it after initSchema() completes
+ String projectName = odpsTable.getProject();
+ String tableName = odpsTable.getName();
+ MaxComputeMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr()
+ .getMaxComputeMetadataCache(catalog.getId());
+ return metadataCache.getCachedPartitionValues(
+ new MaxComputeCacheKey(projectName, tableName),
+ () -> {
+ TablePartitionValues partitionValues = new
TablePartitionValues();
+ partitionValues.addPartitions(partitionSpecs,
+ partitionSpecs.stream()
+ .map(p -> parsePartitionValues(new
ArrayList<>(getPartitionNames()), p))
+ .collect(Collectors.toList()),
+ partitionTypes);
+ return partitionValues;
+ });
+ }
+
+ /**
+ * parse all values from partitionPath to a single list.
+ * @param partitionColumns partitionColumns can contain the
part1,part2,part3...
+ * @param partitionPath partitionPath format is like the
'part1=123/part2=abc/part3=1bc'
+ * @return all values of partitionPath
+ */
+ private static List<String> parsePartitionValues(List<String>
partitionColumns, String partitionPath) {
+ String[] partitionFragments = partitionPath.split("/");
+ if (partitionFragments.length != partitionColumns.size()) {
+ throw new RuntimeException("Failed to parse partition values of
path: " + partitionPath);
+ }
+ List<String> partitionValues = new
ArrayList<>(partitionFragments.length);
+ for (int i = 0; i < partitionFragments.length; i++) {
+ String prefix = partitionColumns.get(i) + "=";
+ if (partitionFragments[i].startsWith(prefix)) {
+
partitionValues.add(partitionFragments[i].substring(prefix.length()));
+ } else {
+ partitionValues.add(partitionFragments[i]);
+ }
+ }
+ return partitionValues;
+ }
+
@Override
public List<Column> initSchema() {
+ // this method will be called at semantic parsing.
makeSureInitialized();
List<com.aliyun.odps.Column> columns =
odpsTable.getSchema().getColumns();
List<Column> result = Lists.newArrayListWithCapacity(columns.size());
@@ -86,72 +153,31 @@ public class MaxComputeExternalTable extends ExternalTable
{
result.add(new Column(field.getName(),
mcTypeToDorisType(field.getTypeInfo()), true, null,
true, field.getComment(), true, -1));
}
- List<com.aliyun.odps.Column> partitionColumns =
odpsTable.getSchema().getPartitionColumns();
- partitionKeys = new HashSet<>();
- for (com.aliyun.odps.Column partColumn : partitionColumns) {
- result.add(new Column(partColumn.getName(),
mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
- true, partColumn.getComment(), true, -1));
- partitionKeys.add(partColumn.getName());
- }
+ result.addAll(partitionNameToColumns.values());
return result;
}
- public Optional<String> getPartitionSpec(List<Expr> conjuncts) {
- if (!partitionKeys.isEmpty()) {
- if (conjuncts.isEmpty()) {
- throw new IllegalArgumentException("Max Compute partition
table need partition predicate.");
- }
- // recreate partitionSpec when conjuncts is changed.
- List<String> partitionConjuncts =
parsePartitionConjuncts(conjuncts, partitionKeys);
- StringJoiner partitionSpec = new StringJoiner(",");
- partitionConjuncts.forEach(partitionSpec::add);
- this.partitionSpec = partitionSpec.toString();
- return Optional.of(this.partitionSpec);
- }
- return Optional.empty();
- }
-
- private static List<String> parsePartitionConjuncts(List<Expr> conjuncts,
Set<String> partitionKeys) {
- List<String> partitionConjuncts = new ArrayList<>();
- Set<Predicate> predicates = Sets.newHashSet();
- for (Expr conjunct : conjuncts) {
- // collect depart predicate
- conjunct.collect(BinaryPredicate.class, predicates);
- conjunct.collect(InPredicate.class, predicates);
- }
- Map<String, Predicate> slotToConjuncts = new HashMap<>();
- for (Predicate predicate : predicates) {
- List<SlotRef> slotRefs = new ArrayList<>();
- if (predicate instanceof BinaryPredicate) {
- if (((BinaryPredicate) predicate).getOp() !=
BinaryPredicate.Operator.EQ) {
- // max compute only support the EQ operator: pt='pt-value'
- continue;
- }
- // BinaryPredicate has one left slotRef, and partition value
not slotRef
- predicate.collect(SlotRef.class, slotRefs);
- slotToConjuncts.put(slotRefs.get(0).getColumnName(),
predicate);
- } else if (predicate instanceof InPredicate) {
- predicate.collect(SlotRef.class, slotRefs);
- slotToConjuncts.put(slotRefs.get(0).getColumnName(),
predicate);
- }
+ private void initTablePartitions() {
+ List<com.aliyun.odps.Column> partitionColumns =
odpsTable.getSchema().getPartitionColumns();
+ if (!partitionColumns.isEmpty()) {
+ partitionSpecs = odpsTable.getPartitions().stream()
+ .map(e -> e.getPartitionSpec().toString(false, true))
+ .collect(Collectors.toList());
+ } else {
+ partitionSpecs = ImmutableList.of();
}
- for (String partitionKey : partitionKeys) {
- Predicate partitionPredicate = slotToConjuncts.get(partitionKey);
- if (partitionPredicate == null) {
- continue;
- }
- if (partitionPredicate instanceof InPredicate) {
- List<Expr> inList = ((InPredicate)
partitionPredicate).getListChildren();
- for (Expr expr : inList) {
- String partitionConjunct = partitionKey + "=" +
expr.toSql();
- partitionConjuncts.add(partitionConjunct.replace("`", ""));
- }
- } else {
- String partitionConjunct = partitionPredicate.toSql();
- partitionConjuncts.add(partitionConjunct.replace("`", ""));
- }
+ // sort partition columns to align partitionTypes and partitionName.
+ partitionNameToColumns = new LinkedHashMap<>();
+ for (com.aliyun.odps.Column partColumn : partitionColumns) {
+ Column dorisCol = new Column(partColumn.getName(),
+ mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
+ true, partColumn.getComment(), true, -1);
+ partitionNameToColumns.put(dorisCol.getName(), dorisCol);
}
- return partitionConjuncts;
+ partitionTypes = partitionNameToColumns.values()
+ .stream()
+ .map(Column::getType)
+ .collect(Collectors.toList());
}
private Type mcTypeToDorisType(TypeInfo typeInfo) {
@@ -241,11 +267,10 @@ public class MaxComputeExternalTable extends
ExternalTable {
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
TMCTable tMcTable = new TMCTable();
- MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog)
catalog;
+ MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog)
catalog);
tMcTable.setRegion(mcCatalog.getRegion());
tMcTable.setAccessKey(mcCatalog.getAccessKey());
tMcTable.setSecretKey(mcCatalog.getSecretKey());
- tMcTable.setPartitionSpec(this.partitionSpec);
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
// use mc project as dbName
tMcTable.setProject(dbName);
@@ -257,6 +282,7 @@ public class MaxComputeExternalTable extends ExternalTable {
}
public Table getOdpsTable() {
+ makeSureInitialized();
return odpsTable;
}
@@ -264,6 +290,5 @@ public class MaxComputeExternalTable extends ExternalTable {
public String getMysqlType() {
return "BASE TABLE";
}
-
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index 03a46c625e8..ef62f498695 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -54,6 +54,7 @@ public class ExternalMetaCacheMgr {
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
+ private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
public ExternalMetaCacheMgr() {
executor = ThreadPoolManager.newDaemonFixedThreadPool(
@@ -63,6 +64,7 @@ public class ExternalMetaCacheMgr {
hudiPartitionMgr = HudiPartitionMgr.get(executor);
fsCache = new FileSystemCache(executor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
+ maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
}
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
@@ -99,6 +101,10 @@ public class ExternalMetaCacheMgr {
return icebergMetadataCacheMgr.getIcebergMetadataCache();
}
+ public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
+ return
maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
+ }
+
public FileSystemCache getFsCache() {
return fsCache;
}
@@ -112,6 +118,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.removePartitionProcessor(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
+ maxComputeMetadataCacheMgr.removeCache(catalogId);
}
public void invalidateTableCache(long catalogId, String dbName, String
tblName) {
@@ -126,6 +133,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName,
tblName);
+ maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName,
tblName);
LOG.debug("invalid table cache for {}.{} in catalog {}", dbName,
tblName, catalogId);
}
@@ -141,6 +149,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
+ maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
}
@@ -155,6 +164,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.cleanPartitionProcess(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
+ maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
LOG.debug("invalid catalog cache for {}", catalogId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeCacheKey.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeCacheKey.java
new file mode 100644
index 00000000000..441c2e84474
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeCacheKey.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import lombok.Data;
+
+import java.util.Objects;
+
+@Data
+public class MaxComputeCacheKey {
+ private final String dbName;
+ private final String tblName;
+ private String partitionSpec; // optional
+
+ public MaxComputeCacheKey(String dbName, String tblName) {
+ this(dbName, tblName, null);
+ }
+
+ public MaxComputeCacheKey(String dbName, String tblName, String
partitionSpec) {
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.partitionSpec = partitionSpec;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof MaxComputeCacheKey)) {
+ return false;
+ }
+ boolean partitionEquals = true;
+ if (partitionSpec != null) {
+ partitionEquals = partitionSpec.equals(((MaxComputeCacheKey)
obj).partitionSpec);
+ }
+ return partitionEquals && dbName.equals(((MaxComputeCacheKey)
obj).dbName)
+ && tblName.equals(((MaxComputeCacheKey) obj).tblName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbName, tblName);
+ }
+
+ @Override
+ public String toString() {
+ return "TablePartitionKey{" + "dbName='" + dbName + '\'' + ",
tblName='" + tblName + '\'' + '}';
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
index 0cd99678bad..b361d0c8144 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
@@ -24,22 +24,23 @@ import
org.apache.doris.datasource.property.constants.MCProperties;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
-import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.Partition;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
-import com.aliyun.odps.tunnel.TunnelException;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
+import java.util.stream.Collectors;
public class MaxComputeExternalCatalog extends ExternalCatalog {
private Odps odps;
+ private TableTunnel tunnel;
@SerializedName(value = "region")
private String region;
@SerializedName(value = "accessKey")
@@ -93,23 +94,17 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
}
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(defaultProject);
- }
-
- public long getTotalRows(String project, String table, Optional<String>
partitionSpec) throws TunnelException {
- makeSureInitialized();
- TableTunnel tunnel = new TableTunnel(odps);
+ tunnel = new TableTunnel(odps);
String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
tunnelUrl = tunnelUrl.replace("-inc", "");
}
- TableTunnel.DownloadSession downloadSession;
tunnel.setEndpoint(tunnelUrl);
- if (!partitionSpec.isPresent()) {
- downloadSession = tunnel.getDownloadSession(project, table, null);
- } else {
- downloadSession = tunnel.getDownloadSession(project, table, new
PartitionSpec(partitionSpec.get()), null);
- }
- return downloadSession.getRecordCount();
+ }
+
+ public TableTunnel getTableTunnel() {
+ makeSureInitialized();
+ return tunnel;
}
public Odps getClient() {
@@ -139,6 +134,42 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
}
}
+ public List<String> listPartitionNames(String dbName, String tbl) {
+ return listPartitionNames(dbName, tbl, 0, -1);
+ }
+
+ public List<String> listPartitionNames(String dbName, String tbl, long
skip, long limit) {
+ try {
+ if (getClient().projects().exists(dbName)) {
+ List<Partition> parts;
+ if (limit < 0) {
+ parts = getClient().tables().get(tbl).getPartitions();
+ } else {
+ skip = skip < 0 ? 0 : skip;
+ parts = new ArrayList<>();
+ Iterator<Partition> it =
getClient().tables().get(tbl).getPartitionIterator();
+ int count = 0;
+ while (it.hasNext()) {
+ if (count < skip) {
+ count++;
+ it.next();
+ } else if (parts.size() >= limit) {
+ break;
+ } else {
+ parts.add(it.next());
+ }
+ }
+ }
+ return parts.stream().map(p ->
p.getPartitionSpec().toString(false, true))
+ .collect(Collectors.toList());
+ } else {
+ throw new OdpsException("Max compute project: " + dbName + "
not exists.");
+ }
+ } catch (OdpsException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCache.java
new file mode 100644
index 00000000000..98b835813d9
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCache.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.planner.external.TablePartitionValues;
+
+import com.aliyun.odps.tunnel.TunnelException;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class MaxComputeMetadataCache {
+ private final Cache<MaxComputeCacheKey, TablePartitionValues>
partitionValuesCache;
+ private final Cache<MaxComputeCacheKey, Long> tableRowCountCache;
+
+ public MaxComputeMetadataCache() {
+ partitionValuesCache =
CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
+
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
+ .build();
+ tableRowCountCache = CacheBuilder.newBuilder().maximumSize(10000)
+
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access,
TimeUnit.MINUTES)
+ .build();
+ }
+
+ public Long getCachedRowCount(String dbName, String tblName, String
partitionSpec,
+ Callable<? extends Long> loader) throws
TunnelException {
+ try {
+ MaxComputeCacheKey tablePartitionKey = new
MaxComputeCacheKey(dbName, tblName, partitionSpec);
+ return tableRowCountCache.get(tablePartitionKey, loader);
+ } catch (ExecutionException e) {
+ throw new TunnelException(e.getMessage(), e);
+ }
+ }
+
+ public TablePartitionValues getCachedPartitionValues(MaxComputeCacheKey
tablePartitionKey,
+ Callable<? extends
TablePartitionValues> loader) {
+ try {
+ return partitionValuesCache.get(tablePartitionKey, loader);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Fail to load partition values for
table:"
+ + " '" + tablePartitionKey.getDbName() + "." +
tablePartitionKey.getTblName() + "'");
+ }
+ }
+
+ public void cleanUp() {
+ partitionValuesCache.invalidateAll();
+ tableRowCountCache.invalidateAll();
+ }
+
+ public void cleanDatabaseCache(String dbName) {
+ List<MaxComputeCacheKey> removeCacheList =
partitionValuesCache.asMap().keySet()
+ .stream()
+ .filter(k -> k.getDbName().equalsIgnoreCase(dbName))
+ .collect(Collectors.toList());
+ partitionValuesCache.invalidateAll(removeCacheList);
+
+ List<MaxComputeCacheKey> removeCacheRowCountList =
tableRowCountCache.asMap().keySet()
+ .stream()
+ .filter(k -> k.getDbName().equalsIgnoreCase(dbName))
+ .collect(Collectors.toList());
+ tableRowCountCache.invalidateAll(removeCacheRowCountList);
+ }
+
+ public void cleanTableCache(String dbName, String tblName) {
+ MaxComputeCacheKey cacheKey = new MaxComputeCacheKey(dbName, tblName);
+ partitionValuesCache.invalidate(cacheKey);
+ tableRowCountCache.invalidate(cacheKey);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCacheMgr.java
new file mode 100644
index 00000000000..72449b61949
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeMetadataCacheMgr.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class MaxComputeMetadataCacheMgr {
+
+ private static final Map<Long, MaxComputeMetadataCache>
maxComputeMetadataCaches = Maps.newConcurrentMap();
+
+ public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
+ MaxComputeMetadataCache cache =
maxComputeMetadataCaches.get(catalogId);
+ if (cache == null) {
+ cache = new MaxComputeMetadataCache();
+ maxComputeMetadataCaches.put(catalogId, cache);
+ }
+ return cache;
+ }
+
+ public void removeCache(long catalogId) {
+ MaxComputeMetadataCache cache =
maxComputeMetadataCaches.remove(catalogId);
+ if (cache != null) {
+ cache.cleanUp();
+ }
+ }
+
+ public void invalidateCatalogCache(long catalogId) {
+ MaxComputeMetadataCache cache =
maxComputeMetadataCaches.get(catalogId);
+ if (cache != null) {
+ cache.cleanUp();
+ }
+ }
+
+ public void invalidateDbCache(long catalogId, String dbName) {
+ MaxComputeMetadataCache cache =
maxComputeMetadataCaches.get(catalogId);
+ if (cache != null) {
+ cache.cleanDatabaseCache(dbName);
+ }
+ }
+
+ public void invalidateTableCache(long catalogId, String dbName, String
tblName) {
+ MaxComputeMetadataCache cache =
maxComputeMetadataCaches.get(catalogId);
+ if (cache != null) {
+ cache.cleanTableCache(dbName, tblName);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
index f3c2557a1de..c699be330a1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
@@ -27,7 +27,6 @@ import
com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@@ -126,9 +125,15 @@ public class PooledHiveMetaStoreClient {
}
public List<String> listPartitionNames(String dbName, String tblName) {
+ return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
+ }
+
+ public List<String> listPartitionNames(String dbName, String tblName, long
max) {
+ // list all parts when the limit is greater than the short maximum
+ short limited = max <= Short.MAX_VALUE ? (short) max :
MAX_LIST_PARTITION_NUM;
try (CachedClient client = getClient()) {
try {
- return client.client.listPartitionNames(dbName, tblName,
MAX_LIST_PARTITION_NUM);
+ return client.client.listPartitionNames(dbName, tblName,
limited);
} catch (Exception e) {
client.setThrowable(e);
throw e;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 9468cc42881..85245de1020 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -339,6 +339,8 @@ public abstract class FileQueryScanNode extends
FileScanNode {
PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit)
fileSplit);
} else if (fileSplit instanceof HudiSplit) {
HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
+ } else if (fileSplit instanceof MaxComputeSplit) {
+ MaxComputeScanNode.setScanParams(rangeDesc, (MaxComputeSplit)
fileSplit);
}
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
index d7f8d599a61..ae0b424ad81 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -18,26 +18,33 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
+import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TMaxComputeFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+import com.aliyun.odps.Table;
import com.aliyun.odps.tunnel.TunnelException;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
public class MaxComputeScanNode extends FileQueryScanNode {
@@ -56,6 +63,17 @@ public class MaxComputeScanNode extends FileQueryScanNode {
catalog = (MaxComputeExternalCatalog) table.getCatalog();
}
+ public static void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit
maxComputeSplit) {
+ TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
+ TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
+ if (maxComputeSplit.getPartitionSpec().isPresent()) {
+
fileDesc.setPartitionSpec(maxComputeSplit.getPartitionSpec().get());
+ }
+ tableFormatFileDesc.setMaxComputeParams(fileDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ }
+
@Override
protected TFileType getLocationType() throws UserException {
return getLocationType(null);
@@ -89,43 +107,92 @@ public class MaxComputeScanNode extends FileQueryScanNode {
@Override
protected List<Split> getSplits() throws UserException {
List<Split> result = new ArrayList<>();
- // String splitPath = catalog.getTunnelUrl();
- // TODO: use single max compute scan node rather than file scan node
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return result;
}
try {
- List<Pair<Long, Long>> sliceRange = new ArrayList<>();
- Optional<String> partitionSpec = table.getPartitionSpec(conjuncts);
- long totalRows = catalog.getTotalRows(table.getDbName(),
table.getName(), partitionSpec);
- long fileNum = odpsTable.getFileNum();
- long start = 0;
- long splitSize = (long) Math.ceil((double) totalRows / fileNum);
- if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) {
- // use whole split
- sliceRange.add(Pair.of(start, totalRows));
- } else {
- for (int i = 0; i < fileNum; i++) {
- if (start > totalRows) {
- break;
- }
- sliceRange.add(Pair.of(start, splitSize));
- start += splitSize;
+ if (!table.getPartitionNames().isEmpty()) {
+ if (conjuncts.isEmpty()) {
+ throw new IllegalArgumentException("Max Compute partition
table need partition predicate.");
}
- }
- long modificationTime =
odpsTable.getLastDataModifiedTime().getTime();
- if (!sliceRange.isEmpty()) {
- for (int i = 0; i < sliceRange.size(); i++) {
- Pair<Long, Long> range = sliceRange.get(i);
- result.add(new FileSplit(new Path("/virtual_slice_" + i),
range.first, range.second,
- totalRows, modificationTime, null,
Collections.emptyList()));
+ List<String> partitionSpecs = getPartitionSpecs();
+ for (String partitionSpec : partitionSpecs) {
+ addPartitionSplits(result, odpsTable, partitionSpec);
}
+ } else {
+ addBatchSplits(result, odpsTable, table.getTotalRows());
}
} catch (TunnelException e) {
- throw new UserException("Max Compute tunnel SDK exception.", e);
+ throw new UserException("Max Compute tunnel SDK exception: " +
e.getMessage(), e);
}
return result;
}
+
+ private static void addPartitionSplits(List<Split> result, Table
odpsTable, String partitionSpec) {
+ long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
+ // use '-1' to read whole partition, avoid expending too much time on
calling table.getTotalRows()
+ Pair<Long, Long> range = Pair.of(0L, -1L);
+ FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_part"),
+ range.first, range.second, -1, modificationTime, null,
Collections.emptyList());
+ result.add(new MaxComputeSplit(partitionSpec, rangeSplit));
+ }
+
+ private static void addBatchSplits(List<Split> result, Table odpsTable,
long totalRows) {
+ List<Pair<Long, Long>> sliceRange = new ArrayList<>();
+ long fileNum = odpsTable.getFileNum();
+ long start = 0;
+ long splitSize = (long) Math.ceil((double) totalRows / fileNum);
+ if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) {
+ // use whole split
+ sliceRange.add(Pair.of(start, totalRows));
+ } else {
+ for (int i = 0; i < fileNum; i++) {
+ if (start > totalRows) {
+ break;
+ }
+ sliceRange.add(Pair.of(start, splitSize));
+ start += splitSize;
+ }
+ }
+ long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
+ if (!sliceRange.isEmpty()) {
+ for (int i = 0; i < sliceRange.size(); i++) {
+ Pair<Long, Long> range = sliceRange.get(i);
+ FileSplit rangeSplit = new FileSplit(new
Path("/virtual_slice_" + i),
+ range.first, range.second, totalRows,
modificationTime, null, Collections.emptyList());
+ result.add(new MaxComputeSplit(rangeSplit));
+ }
+ }
+ }
+
+ private List<String> getPartitionSpecs() throws AnalysisException {
+ return getPrunedPartitionSpecs();
+ }
+
+ private List<String> getPrunedPartitionSpecs() throws AnalysisException {
+ List<String> result = new ArrayList<>();
+ TablePartitionValues partitionValues = table.getPartitionValues();
+ // prune partitions by expr
+ partitionValues.readLock().lock();
+ try {
+ Map<Long, PartitionItem> idToPartitionItem =
partitionValues.getIdToPartitionItem();
+ this.totalPartitionNum = idToPartitionItem.size();
+ ListPartitionPrunerV2 pruner = new
ListPartitionPrunerV2(idToPartitionItem,
+ table.getPartitionColumns(), columnNameToRange,
+ partitionValues.getUidToPartitionRange(),
+ partitionValues.getRangeToId(),
+ partitionValues.getSingleColumnRangeMap(),
+ false);
+ Collection<Long> filteredPartitionIds = pruner.prune();
+ this.readPartitionNum = filteredPartitionIds.size();
+ // get partitions from cache
+ Map<Long, String> partitionIdToNameMap =
partitionValues.getPartitionIdToNameMap();
+ filteredPartitionIds.forEach(id ->
result.add(partitionIdToNameMap.get(id)));
+ return result;
+ } finally {
+ partitionValues.readLock().unlock();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeSplit.java
similarity index 53%
copy from
fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to
fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeSplit.java
index 891e138db6b..a14e5fe22a6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeSplit.java
@@ -17,20 +17,24 @@
package org.apache.doris.planner.external;
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi"),
- PAIMON("paimon"),
- TRANSACTIONAL_HIVE("transactional_hive");
+import java.util.Optional;
- private final String tableFormatType;
+public class MaxComputeSplit extends FileSplit {
+ private final Optional<String> partitionSpec;
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
+ public MaxComputeSplit(FileSplit rangeSplit) {
+ super(rangeSplit.path, rangeSplit.start, rangeSplit.length,
rangeSplit.fileLength,
+ rangeSplit.hosts, rangeSplit.partitionValues);
+ this.partitionSpec = Optional.empty();
}
- public String value() {
- return tableFormatType;
+ public MaxComputeSplit(String partitionSpec, FileSplit rangeSplit) {
+ super(rangeSplit.path, rangeSplit.start, rangeSplit.length,
rangeSplit.fileLength,
+ rangeSplit.hosts, rangeSplit.partitionValues);
+ this.partitionSpec = Optional.of(partitionSpec);
+ }
+
+ public Optional<String> getPartitionSpec() {
+ return partitionSpec;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index 891e138db6b..b5f41f97ba4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -22,6 +22,7 @@ public enum TableFormatType {
ICEBERG("iceberg"),
HUDI("hudi"),
PAIMON("paimon"),
+ MAX_COMPUTE("max_compute"),
TRANSACTIONAL_HIVE("transactional_hive");
private final String tableFormatType;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
index a207f5f082a..acd44a50900 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TablePartitionValues.java
@@ -218,11 +218,16 @@ public class TablePartitionValues {
@Data
public static class TablePartitionKey {
- private String dbName;
- private String tblName;
+ private final String dbName;
+ private final String tblName;
// not in key
private List<Type> types;
+ public TablePartitionKey(String dbName, String tblName) {
+ this.dbName = dbName;
+ this.tblName = tblName;
+ }
+
public TablePartitionKey(String dbName, String tblName, List<Type>
types) {
this.dbName = dbName;
this.tblName = tblName;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
index 37225c2339c..ba793ecf407 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/hudi/HudiCachedPartitionProcessor.java
@@ -96,11 +96,11 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
if (Long.parseLong(timestamp) == lastTimestamp) {
return getPartitionValues(table, tableMetaClient);
}
- List<String> partitionNames =
getPartitionNamesBeforeOrEquals(timeline, timestamp);
- List<String> partitionColumnsList =
Arrays.asList(partitionColumns.get());
+ List<String> partitionNameAndValues =
getPartitionNamesBeforeOrEquals(timeline, timestamp);
+ List<String> partitionNames = Arrays.asList(partitionColumns.get());
TablePartitionValues partitionValues = new TablePartitionValues();
- partitionValues.addPartitions(partitionNames,
- partitionNames.stream().map(p ->
parsePartitionValues(partitionColumnsList, p))
+ partitionValues.addPartitions(partitionNameAndValues,
+ partitionNameAndValues.stream().map(p ->
parsePartitionValues(partitionNames, p))
.collect(Collectors.toList()),
table.getPartitionColumnTypes());
return partitionValues;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 45f3fc8030d..adb344cd897 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.AdminShowReplicaStatusStmt;
import org.apache.doris.analysis.AdminShowTabletStorageFormatStmt;
import org.apache.doris.analysis.DescribeStmt;
import org.apache.doris.analysis.HelpStmt;
+import org.apache.doris.analysis.LimitElement;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ShowAlterStmt;
import org.apache.doris.analysis.ShowAnalyzeStmt;
@@ -179,6 +180,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.external.iceberg.IcebergTableCreationRecord;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJob;
@@ -1616,18 +1618,49 @@ public class ShowExecutor {
List<List<String>> rows = ((PartitionsProcDir)
procNodeI).fetchResultByFilter(showStmt.getFilterMap(),
showStmt.getOrderByPairs(),
showStmt.getLimitElement()).getRows();
resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+ } else if (showStmt.getCatalog() instanceof MaxComputeExternalCatalog)
{
+ handleShowMaxComputeTablePartitions(showStmt);
} else {
handleShowHMSTablePartitions(showStmt);
}
}
+ private void handleShowMaxComputeTablePartitions(ShowPartitionsStmt
showStmt) {
+ MaxComputeExternalCatalog catalog = (MaxComputeExternalCatalog)
(showStmt.getCatalog());
+ List<List<String>> rows = new ArrayList<>();
+ String dbName =
ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb());
+ List<String> partitionNames;
+ LimitElement limit = showStmt.getLimitElement();
+ if (limit != null && limit.hasLimit()) {
+ partitionNames = catalog.listPartitionNames(dbName,
+ showStmt.getTableName().getTbl(), limit.getOffset(),
limit.getLimit());
+ } else {
+ partitionNames = catalog.listPartitionNames(dbName,
showStmt.getTableName().getTbl());
+ }
+ for (String partition : partitionNames) {
+ List<String> list = new ArrayList<>();
+ list.add(partition);
+ rows.add(list);
+ }
+ // sort by partition name
+ rows.sort(Comparator.comparing(x -> x.get(0)));
+ resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
+ }
+
private void handleShowHMSTablePartitions(ShowPartitionsStmt showStmt) {
HMSExternalCatalog catalog = (HMSExternalCatalog)
(showStmt.getCatalog());
List<List<String>> rows = new ArrayList<>();
String dbName =
ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb());
- List<String> partitionNames =
catalog.getClient().listPartitionNames(dbName,
- showStmt.getTableName().getTbl());
+ List<String> partitionNames;
+ LimitElement limit = showStmt.getLimitElement();
+ if (limit != null && limit.hasLimit()) {
+ // only limit is valid on Hive
+ partitionNames = catalog.getClient()
+ .listPartitionNames(dbName,
showStmt.getTableName().getTbl(), limit.getLimit());
+ } else {
+ partitionNames = catalog.getClient().listPartitionNames(dbName,
showStmt.getTableName().getTbl());
+ }
for (String partition : partitionNames) {
List<String> list = new ArrayList<>();
list.add(partition);
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 95f23690d88..37adc3d4274 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -328,7 +328,6 @@ struct TMCTable {
4: optional string access_key
5: optional string secret_key
6: optional string public_access
- 7: optional string partition_spec
}
// "Union" of all table types.
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index e32092994ef..a57745e78d3 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -310,6 +310,9 @@ struct TPaimonFileDesc {
10: optional i64 last_update_time
}
+struct TMaxComputeFileDesc {
+ 1: optional string partition_spec
+}
struct THudiFileDesc {
1: optional string instant_time;
@@ -340,6 +343,7 @@ struct TTableFormatFileDesc {
3: optional THudiFileDesc hudi_params
4: optional TPaimonFileDesc paimon_params
5: optional TTransactionalHiveDesc transactional_hive_params
+ 6: optional TMaxComputeFileDesc max_compute_params
}
enum TTextSerdeType {
diff --git
a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
index 6cd91cf2ee3..e75e12c137b 100644
---
a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
+++
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
@@ -20,8 +20,50 @@ true 77 8920 182239402452
9601 qewtoll 2020-09-21
-- !q7 --
-6223 maxam 2020-09-21
-9601 qewtoll 2020-09-21
+1633 siwtow 2021-08-21
+1633 siwtow 20210821
+1633 siwtow 20210921
-- !replay_q6 --
9601 qewtoll 2020-09-21
+
+-- !multi_partition_q1 --
+pt=13/yy=2021/mm=12/dd=22
+pt=14/yy=2021/mm=12/dd=21
+pt=14/yy=2021/mm=12/dd=22
+
+-- !multi_partition_q2 --
+17 2022-04-23T11:12:30 2021 12 22
+17 2022-04-23T11:12:30 2021 12 21
+16 2022-04-23T11:12:30 2021 12 22
+
+-- !multi_partition_q3 --
+14 2022-04-23T11:12:30 2022 01 01
+14 2022-04-23T11:12:30 2022 01 02
+98 2022-04-23T11:12:30 2021 12 21
+
+-- !multi_partition_q4 --
+22
+
+-- !multi_partition_q5 --
+2022-04-23T11:12:30 2021 12 21
+2022-04-23T11:12:30 2021 12 21
+2022-04-23T11:12:30 2021 12 21
+
+-- !multi_partition_q6 --
+17 2021 12
+
+-- !multi_partition_q7 --
+20
+
+-- !multi_partition_q8 --
+11
+
+-- !multi_partition_q9 --
+lweu 8920 true 2023-11-23T12:03:54.952 0.123 2022-04-23
2022-04-23T11:12:30 12 2021 12 22
+wert 8920 true 2023-11-23T12:05:01.693 0.123 2022-04-23
2022-04-23T11:12:30 12 2021 12 22
+
+-- !multi_partition_q10 --
+12 2021 12 21
+12 2021 12 22
+12 2021 12 22
diff --git
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
index 23d0c0b252d..c016f8b91f2 100644
---
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
+++
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
@@ -49,7 +49,7 @@ suite("test_external_catalog_maxcompute",
"p2,external,maxcompute,external_remot
qt_q4 """ select * from mc_parts where dt = '2020-09-21' """
qt_q5 """ select * from mc_parts where dt = '2021-08-21' """
qt_q6 """ select * from mc_parts where dt = '2020-09-21' and
mc_bigint > 6223 """
- qt_q7 """ select * from mc_parts where dt = '2020-09-21' or
mc_bigint > 0 """
+ qt_q7 """ select * from mc_parts where dt = '2020-09-21' or
(mc_bigint > 0 and dt > '2020-09-20') order by mc_bigint, dt limit 3; """
}
sql """ switch `${mc_catalog_name}`; """
@@ -73,5 +73,20 @@ suite("test_external_catalog_maxcompute",
"p2,external,maxcompute,external_remot
sql """ switch `${mc_catalog_name}`; """
sql """ use `${mc_db}`; """
qt_replay_q6 """ select * from mc_parts where dt = '2020-09-21' and
mc_bigint > 6223 """
+
+ // test multi partitions prune
+ sql """ refresh catalog ${mc_catalog_name} """
+ sql """ switch `${mc_catalog_name}`; """
+ sql """ use `${mc_db}`; """
+ qt_multi_partition_q1 """ show partitions from multi_partitions limit
5,3; """
+ qt_multi_partition_q2 """ select pt, create_time, yy, mm, dd from
multi_partitions where pt>-1 and yy > '' and mm > '' and dd >'' order by pt
desc, dd desc limit 3; """
+ qt_multi_partition_q3 """ select sum(pt), create_time, yy, mm, dd from
multi_partitions where yy > '' and mm > '' and dd >'' group by create_time, yy,
mm, dd order by dd limit 3; """
+ qt_multi_partition_q4 """ select count(*) from multi_partitions where
pt>-1 and yy > '' and mm > '' and dd <= '30'; """
+ qt_multi_partition_q5 """ select create_time, yy, mm, dd from
multi_partitions where yy = '2021' and mm='12' and dd='21' order by pt limit 3;
"""
+ qt_multi_partition_q6 """ select max(pt), yy, mm from multi_partitions
where yy = '2021' and mm='12' group by yy, mm order by yy, mm; """
+ qt_multi_partition_q7 """ select count(*) from multi_partitions where
yy < '2022'; """
+ qt_multi_partition_q8 """ select count(*) from multi_partitions where
pt>=14; """
+ qt_multi_partition_q9 """ select
city,mnt,gender,finished_time,order_rate,cut_date,create_time,pt, yy, mm, dd
from multi_partitions where pt >= 12 and pt < 14 and finished_time is not null;
"""
+ qt_multi_partition_q10 """ select pt, yy, mm, dd from multi_partitions
where pt >= 12 and create_time > '2022-04-23 11:11:00' order by pt, yy, mm, dd
limit 3; """
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]