This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 18c2a13e09c [fix](multi-catalog)fix maxcompute partition filter and
session creation (#24911)
18c2a13e09c is described below
commit 18c2a13e09c421152a7d34d4e51e3f69c2a21ac0
Author: slothever <[email protected]>
AuthorDate: Tue Oct 17 22:36:10 2023 +0800
[fix](multi-catalog)fix maxcompute partition filter and session creation
(#24911)
add maxcompute partition support
fix maxcompute partition filter
modify maxcompute session create method
---
be/src/runtime/descriptors.cpp | 1 +
be/src/runtime/descriptors.h | 2 +
.../exec/format/table/max_compute_jni_reader.cpp | 1 +
.../doris/maxcompute/MaxComputeJniScanner.java | 39 +++++-
.../doris/maxcompute/MaxComputePartitionValue.java | 137 +++++++++++++++++++++
.../doris/maxcompute/MaxComputeTableScan.java | 30 +++--
.../catalog/external/MaxComputeExternalTable.java | 80 ++++++++++++
.../datasource/MaxComputeExternalCatalog.java | 12 +-
.../doris/planner/external/MaxComputeScanNode.java | 4 +-
gensrc/thrift/Descriptors.thrift | 1 +
regression-test/conf/regression-conf.groovy | 4 +
.../test_external_catalog_maxcompute.out | 24 ++++
.../test_external_catalog_maxcompute.groovy | 60 +++++++++
13 files changed, 379 insertions(+), 16 deletions(-)
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index 5b58ff2af56..52075ae555b 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -190,6 +190,7 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const
TTableDescriptor& tde
_table(tdesc.mcTable.table),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
+ _partition_spec(tdesc.mcTable.partition_spec),
_public_access(tdesc.mcTable.public_access) {}
MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 18d9bb62f3d..cba1737c3e6 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -235,6 +235,7 @@ public:
const std::string table() const { return _table; }
const std::string access_key() const { return _access_key; }
const std::string secret_key() const { return _secret_key; }
+ const std::string partition_spec() const { return _partition_spec; }
const std::string public_access() const { return _public_access; }
private:
@@ -243,6 +244,7 @@ private:
std::string _table;
std::string _access_key;
std::string _secret_key;
+ std::string _partition_spec;
std::string _public_access;
};
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index 9edd8bfc514..34db6a1df4d 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -64,6 +64,7 @@ MaxComputeJniReader::MaxComputeJniReader(const
MaxComputeTableDescriptor* mc_des
{"access_key",
_table_desc->access_key()},
{"secret_key",
_table_desc->secret_key()},
{"project", _table_desc->project()},
+ {"partition_spec",
_table_desc->partition_spec()},
{"table", _table_desc->table()},
{"public_access",
_table_desc->public_access()},
{"start_offset",
std::to_string(_range.start_offset)},
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 8f9b903afdc..5f4125ec4ed 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.jni.vec.ScanPredicate;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.type.TypeInfo;
@@ -31,6 +32,7 @@ import com.google.common.base.Strings;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import java.io.IOException;
@@ -40,7 +42,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/**
* MaxComputeJ JniScanner. BE will read data from the scanner object.
@@ -49,16 +53,19 @@ public class MaxComputeJniScanner extends JniScanner {
private static final Logger LOG =
Logger.getLogger(MaxComputeJniScanner.class);
private static final String REGION = "region";
private static final String PROJECT = "project";
+ private static final String PARTITION_SPEC = "partition_spec";
private static final String TABLE = "table";
private static final String ACCESS_KEY = "access_key";
private static final String SECRET_KEY = "secret_key";
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
- private static final Map<String, MaxComputeTableScan> tableScans = new
ConcurrentHashMap<>();
+ private final Map<String, MaxComputeTableScan> tableScans = new
ConcurrentHashMap<>();
private final String region;
private final String project;
private final String table;
+ private PartitionSpec partitionSpec;
+ private Set<String> partitionColumns;
private final MaxComputeTableScan curTableScan;
private MaxComputeColumnValue columnValue;
private long remainBatchRows = 0;
@@ -76,7 +83,10 @@ public class MaxComputeJniScanner extends JniScanner {
table = Objects.requireNonNull(params.get(TABLE), "required property
'" + TABLE + "'.");
tableScans.putIfAbsent(tableUniqKey(), newTableScan(params));
curTableScan = tableScans.get(tableUniqKey());
-
+ String partitionSpec = params.get(PARTITION_SPEC);
+ if (StringUtils.isNotEmpty(partitionSpec)) {
+ this.partitionSpec = new PartitionSpec(partitionSpec);
+ }
String[] requiredFields = params.get("required_fields").split(",");
String[] types = params.get("columns_types").split("#");
ColumnType[] columnTypes = new ColumnType[types.length];
@@ -124,6 +134,7 @@ public class MaxComputeJniScanner extends JniScanner {
}
// reorder columns
List<Column> columnList = curTableScan.getSchema().getColumns();
+ columnList.addAll(curTableScan.getSchema().getPartitionColumns());
Map<String, Integer> columnRank = new HashMap<>();
for (int i = 0; i < columnList.size(); i++) {
columnRank.put(columnList.get(i).getName(), i);
@@ -139,13 +150,23 @@ public class MaxComputeJniScanner extends JniScanner {
return;
}
try {
- TableTunnel.DownloadSession session = curTableScan.getSession();
+ TableTunnel.DownloadSession session;
+ if (partitionSpec != null) {
+ session = curTableScan.openDownLoadSession(partitionSpec);
+ } else {
+ session = curTableScan.openDownLoadSession();
+ }
long start = startOffset == -1L ? 0 : startOffset;
long recordCount = session.getRecordCount();
totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) :
recordCount;
arrowAllocator = new RootAllocator(Long.MAX_VALUE);
- curReader = session.openArrowRecordReader(start, totalRows,
readColumns, arrowAllocator);
+ partitionColumns =
session.getSchema().getPartitionColumns().stream()
+ .map(Column::getName)
+ .collect(Collectors.toSet());
+ List<Column> maxComputeColumns = new ArrayList<>(readColumns);
+ maxComputeColumns.removeIf(e ->
partitionColumns.contains(e.getName()));
+ curReader = session.openArrowRecordReader(start, totalRows,
maxComputeColumns, arrowAllocator);
} catch (Exception e) {
close();
throw new IOException(e);
@@ -252,6 +273,16 @@ public class MaxComputeJniScanner extends JniScanner {
appendData(readColumnsToId.get(column.getName()),
columnValue);
}
}
+ if (partitionSpec != null) {
+ for (String partitionColumn : partitionColumns) {
+ String partitionValue =
partitionSpec.get(partitionColumn);
+ Integer readColumnId =
readColumnsToId.get(partitionColumn);
+ if (readColumnId != null && partitionValue != null) {
+ MaxComputePartitionValue value = new
MaxComputePartitionValue(partitionValue);
+ appendData(readColumnId, value);
+ }
+ }
+ }
curReadRows += batchRows;
} finally {
batch.close();
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java
new file mode 100644
index 00000000000..cb76447e589
--- /dev/null
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputePartitionValue.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * MaxCompute Column value in vector column
+ */
+public class MaxComputePartitionValue implements ColumnValue {
+ private String partitionValue;
+
+ public MaxComputePartitionValue(String partitionValue) {
+ reset(partitionValue);
+ }
+
+ public void reset(String partitionValue) {
+ this.partitionValue = partitionValue;
+ }
+
+ @Override
+ public boolean canGetStringAsBytes() {
+ return false;
+ }
+
+ @Override
+ public boolean isNull() {
+ return false;
+ }
+
+ @Override
+ public boolean getBoolean() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte getByte() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public short getShort() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getInt() {
+ return Integer.parseInt(partitionValue);
+ }
+
+ @Override
+ public float getFloat() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getLong() {
+ return Long.parseLong(partitionValue);
+ }
+
+ @Override
+ public double getDouble() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BigInteger getBigInteger() {
+ return BigInteger.valueOf(getLong());
+ }
+
+ @Override
+ public BigDecimal getDecimal() {
+ return BigDecimal.valueOf(getDouble());
+ }
+
+ @Override
+ public String getString() {
+ return partitionValue;
+ }
+
+ @Override
+ public byte[] getStringAsBytes() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LocalDate getDate() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public LocalDateTime getDateTime() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return partitionValue.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void unpackArray(List<ColumnValue> values) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue>
values) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
index da67196a3a2..c0fa40dae46 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
@@ -18,6 +18,7 @@
package org.apache.doris.maxcompute;
import com.aliyun.odps.Odps;
+import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
@@ -35,8 +36,8 @@ public class MaxComputeTableScan {
private final TableTunnel tunnel;
private final String project;
private final String table;
- private volatile TableTunnel.DownloadSession tableSession;
private volatile long readRows = 0;
+ private long totalRows = 0;
public MaxComputeTableScan(String region, String project, String table,
String accessKey, String secretKey, boolean
enablePublicAccess) {
@@ -59,13 +60,24 @@ public class MaxComputeTableScan {
return odps.tables().get(table).getSchema();
}
- public synchronized TableTunnel.DownloadSession getSession() throws
IOException {
- if (tableSession == null) {
- try {
- tableSession = tunnel.createDownloadSession(project, table);
- } catch (TunnelException e) {
- throw new IOException(e);
- }
+ public TableTunnel.DownloadSession openDownLoadSession() throws
IOException {
+ TableTunnel.DownloadSession tableSession;
+ try {
+ tableSession = tunnel.getDownloadSession(project, table, null);
+ totalRows = tableSession.getRecordCount();
+ } catch (TunnelException e) {
+ throw new IOException(e);
+ }
+ return tableSession;
+ }
+
+ public TableTunnel.DownloadSession openDownLoadSession(PartitionSpec
partitionSpec) throws IOException {
+ TableTunnel.DownloadSession tableSession;
+ try {
+ tableSession = tunnel.getDownloadSession(project, table,
partitionSpec, null);
+ totalRows = tableSession.getRecordCount();
+ } catch (TunnelException e) {
+ throw new IOException(e);
}
return tableSession;
}
@@ -76,6 +88,6 @@ public class MaxComputeTableScan {
}
public boolean endOfScan() {
- return readRows >= tableSession.getRecordCount();
+ return readRows >= totalRows;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
index 012693bccd6..3c2f3bada03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
@@ -17,6 +17,11 @@
package org.apache.doris.catalog.external;
+import org.apache.doris.analysis.BinaryPredicate;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.InPredicate;
+import org.apache.doris.analysis.Predicate;
+import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MapType;
@@ -39,9 +44,16 @@ import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.VarcharTypeInfo;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.StringJoiner;
/**
* MaxCompute external table.
@@ -49,6 +61,8 @@ import java.util.List;
public class MaxComputeExternalTable extends ExternalTable {
private Table odpsTable;
+ private Set<String> partitionKeys;
+ private String partitionSpec;
public MaxComputeExternalTable(long id, String name, String dbName,
MaxComputeExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
@@ -72,9 +86,74 @@ public class MaxComputeExternalTable extends ExternalTable {
result.add(new Column(field.getName(),
mcTypeToDorisType(field.getTypeInfo()), true, null,
true, field.getComment(), true, -1));
}
+ List<com.aliyun.odps.Column> partitionColumns =
odpsTable.getSchema().getPartitionColumns();
+ partitionKeys = new HashSet<>();
+ for (com.aliyun.odps.Column partColumn : partitionColumns) {
+ result.add(new Column(partColumn.getName(),
mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
+ true, partColumn.getComment(), true, -1));
+ partitionKeys.add(partColumn.getName());
+ }
return result;
}
+ public Optional<String> getPartitionSpec(List<Expr> conjuncts) {
+ if (!partitionKeys.isEmpty()) {
+ if (conjuncts.isEmpty()) {
+ throw new IllegalArgumentException("Max Compute partition
table need partition predicate.");
+ }
+ // recreate partitionSpec when conjuncts is changed.
+ List<String> partitionConjuncts =
parsePartitionConjuncts(conjuncts, partitionKeys);
+ StringJoiner partitionSpec = new StringJoiner(",");
+ partitionConjuncts.forEach(partitionSpec::add);
+ this.partitionSpec = partitionSpec.toString();
+ return Optional.of(this.partitionSpec);
+ }
+ return Optional.empty();
+ }
+
+ private static List<String> parsePartitionConjuncts(List<Expr> conjuncts,
Set<String> partitionKeys) {
+ List<String> partitionConjuncts = new ArrayList<>();
+ Set<Predicate> predicates = Sets.newHashSet();
+ for (Expr conjunct : conjuncts) {
+ // collect depart predicate
+ conjunct.collect(BinaryPredicate.class, predicates);
+ conjunct.collect(InPredicate.class, predicates);
+ }
+ Map<String, Predicate> slotToConjuncts = new HashMap<>();
+ for (Predicate predicate : predicates) {
+ List<SlotRef> slotRefs = new ArrayList<>();
+ if (predicate instanceof BinaryPredicate) {
+ if (((BinaryPredicate) predicate).getOp() !=
BinaryPredicate.Operator.EQ) {
+ // max compute only support the EQ operator: pt='pt-value'
+ continue;
+ }
+ // BinaryPredicate has one left slotRef, and partition value
not slotRef
+ predicate.collect(SlotRef.class, slotRefs);
+ slotToConjuncts.put(slotRefs.get(0).getColumnName(),
predicate);
+ } else if (predicate instanceof InPredicate) {
+ predicate.collect(SlotRef.class, slotRefs);
+ slotToConjuncts.put(slotRefs.get(0).getColumnName(),
predicate);
+ }
+ }
+ for (String partitionKey : partitionKeys) {
+ Predicate partitionPredicate = slotToConjuncts.get(partitionKey);
+ if (partitionPredicate == null) {
+ continue;
+ }
+ if (partitionPredicate instanceof InPredicate) {
+ List<Expr> inList = ((InPredicate)
partitionPredicate).getListChildren();
+ for (Expr expr : inList) {
+ String partitionConjunct = partitionKey + "=" +
expr.toSql();
+ partitionConjuncts.add(partitionConjunct.replace("`", ""));
+ }
+ } else {
+ String partitionConjunct = partitionPredicate.toSql();
+ partitionConjuncts.add(partitionConjunct.replace("`", ""));
+ }
+ }
+ return partitionConjuncts;
+ }
+
private Type mcTypeToDorisType(TypeInfo typeInfo) {
OdpsType odpsType = typeInfo.getOdpsType();
switch (odpsType) {
@@ -166,6 +245,7 @@ public class MaxComputeExternalTable extends ExternalTable {
tMcTable.setRegion(mcCatalog.getRegion());
tMcTable.setAccessKey(mcCatalog.getAccessKey());
tMcTable.setSecretKey(mcCatalog.getSecretKey());
+ tMcTable.setPartitionSpec(this.partitionSpec);
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
// use mc project as dbName
tMcTable.setProject(dbName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
index 5c5e4ded0c1..0cd99678bad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
@@ -24,6 +24,7 @@ import
org.apache.doris.datasource.property.constants.MCProperties;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
@@ -35,6 +36,7 @@ import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class MaxComputeExternalCatalog extends ExternalCatalog {
private Odps odps;
@@ -93,15 +95,21 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
odps.setDefaultProject(defaultProject);
}
- public long getTotalRows(String project, String table) throws
TunnelException {
+ public long getTotalRows(String project, String table, Optional<String>
partitionSpec) throws TunnelException {
makeSureInitialized();
TableTunnel tunnel = new TableTunnel(odps);
String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
tunnelUrl = tunnelUrl.replace("-inc", "");
}
+ TableTunnel.DownloadSession downloadSession;
tunnel.setEndpoint(tunnelUrl);
- return tunnel.createDownloadSession(project, table).getRecordCount();
+ if (!partitionSpec.isPresent()) {
+ downloadSession = tunnel.getDownloadSession(project, table, null);
+ } else {
+ downloadSession = tunnel.getDownloadSession(project, table, new
PartitionSpec(partitionSpec.get()), null);
+ }
+ return downloadSession.getRecordCount();
}
public Odps getClient() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
index 1030a67a30a..d7f8d599a61 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MaxComputeScanNode.java
@@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public class MaxComputeScanNode extends FileQueryScanNode {
@@ -96,7 +97,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
}
try {
List<Pair<Long, Long>> sliceRange = new ArrayList<>();
- long totalRows = catalog.getTotalRows(table.getDbName(),
table.getName());
+ Optional<String> partitionSpec = table.getPartitionSpec(conjuncts);
+ long totalRows = catalog.getTotalRows(table.getDbName(),
table.getName(), partitionSpec);
long fileNum = odpsTable.getFileNum();
long start = 0;
long splitSize = (long) Math.ceil((double) totalRows / fileNum);
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index a5ae774bd4c..51767200713 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -331,6 +331,7 @@ struct TMCTable {
4: optional string access_key
5: optional string secret_key
6: optional string public_access
+ 7: optional string partition_spec
}
// "Union" of all table types.
diff --git a/regression-test/conf/regression-conf.groovy
b/regression-test/conf/regression-conf.groovy
index 81792a08a14..ea3cab9355c 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -168,6 +168,10 @@ extEsPort = 9200
extEsUser = "*******"
extEsPassword = "***********"
+enableMaxComputeTest=false
+aliYunAk="***********"
+aliYunSk="***********"
+
s3Endpoint = "cos.ap-hongkong.myqcloud.com"
s3BucketName = "doris-build-hk-1308700295"
s3Region = "ap-hongkong"
diff --git
a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
new file mode 100644
index 00000000000..5fc7ade4894
--- /dev/null
+++
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
@@ -0,0 +1,24 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q1 --
+8639377
+
+-- !q2 --
+1 2 2000-08-15 2000-08-16 t 3
4 5 6 7 8 9 10 11 12
13 14 15
16 17 18 19 20 21 22.22 23.23
+
+-- !q3 --
+false 2 44 423432
+true 77 8920 182239402452
+
+-- !q4 --
+6223 maxam 2020-09-21
+9601 qewtoll 2020-09-21
+
+-- !q5 --
+1633 siwtow 2021-08-21
+
+-- !q6 --
+9601 qewtoll 2020-09-21
+
+-- !q7 --
+6223 maxam 2020-09-21
+9601 qewtoll 2020-09-21
diff --git
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
new file mode 100644
index 00000000000..6b050e277a8
--- /dev/null
+++
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_catalog_maxcompute",
"p2,external,maxcompute,external_remote,external_remote_maxcompute") {
+ String enabled = context.config.otherConfigs.get("enableMaxComputeTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String ak = context.config.otherConfigs.get("aliYunAk")
+ String sk = context.config.otherConfigs.get("aliYunSk");
+ String mc_db = "jz_datalake"
+ String mc_catalog_name = "test_external_mc_catalog"
+
+ sql """drop catalog if exists ${mc_catalog_name};"""
+ sql """
+ create catalog if not exists ${mc_catalog_name} properties (
+ "type" = "max_compute",
+ "mc.region" = "cn-beijing",
+ "mc.default.project" = "${mc_db}",
+ "mc.access_key" = "${ak}",
+ "mc.secret_key" = "${sk}",
+ "mc.public_access" = "true"
+ );
+ """
+
+ // query data test
+ def q01 = {
+ qt_q1 """ select count(*) from store_sales """
+ }
+ // data type test
+ def q02 = {
+ qt_q2 """ select * from web_site where web_site_id=2 order by
web_site_id """ // test char,date,varchar,double,decimal
+ qt_q3 """ select * from int_types order by mc_boolean limit 2 """
// test bool,tinyint,int,bigint
+ }
+ // test partition table filter
+ def q03 = {
+ qt_q4 """ select * from mc_parts where dt = '2020-09-21' """
+ qt_q5 """ select * from mc_parts where dt = '2021-08-21' """
+ qt_q6 """ select * from mc_parts where dt = '2020-09-21' and
mc_bigint > 6223 """
+ qt_q7 """ select * from mc_parts where dt = '2020-09-21' or
mc_bigint > 0 """
+ }
+ sql """ switch `${mc_catalog_name}`; """
+ sql """ use `${mc_db}`; """
+ q01()
+ q02()
+ q03()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]