This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 49180fed4bb [opt](iceberg)Add a new appearance to display the pushDown
`count` (#37046)
49180fed4bb is described below
commit 49180fed4bbf291f0777c61c34e712608639f730
Author: wuwenchi <[email protected]>
AuthorDate: Wed Jul 10 23:40:55 2024 +0800
[opt](iceberg)Add a new appearance to display the pushDown `count` (#37046)
## Proposed changes
1. When the count can be pushed down, the specific data will be
displayed later.
```
| pushdown agg=COUNT (1) |
```
2. Add a session variable `enable_count_push_down_for_external_table `
to control whether a pushdown count operation is required. Default is
`true`;
```
mysql> show variables like 'enable_count_push_down_for_external_table';
+-------------------------------------------+-------+---------------+---------+
| Variable_name | Value | Default_Value |
Changed |
+-------------------------------------------+-------+---------------+---------+
| enable_count_push_down_for_external_table | false | true | 1
|
+-------------------------------------------+-------+---------------+---------+
1 row in set (0.02 sec)
```
---
.../org/apache/doris/datasource/FileScanNode.java | 14 ++-
.../datasource/iceberg/source/IcebergScanNode.java | 20 ++--
.../glue/translator/PhysicalPlanTranslator.java | 7 ++
.../java/org/apache/doris/qe/SessionVariable.java | 10 ++
.../iceberg/test_iceberg_optimize_count.out | 25 +++++
.../iceberg/test_iceberg_optimize_count.groovy | 106 +++++++++++++++++++++
6 files changed, 170 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index bb6865582fc..7e5f6da97bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -39,6 +39,7 @@ import org.apache.doris.thrift.TFileScanNode;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TPushAggOp;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
@@ -98,6 +99,10 @@ public abstract class FileScanNode extends ExternalScanNode {
super.toThrift(planNode);
}
+ public long getPushDownCount() {
+ return 0;
+ }
+
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
StringBuilder output = new StringBuilder();
@@ -173,7 +178,14 @@ public abstract class FileScanNode extends
ExternalScanNode {
output.append(String.format("avgRowSize=%s, ", avgRowSize));
}
output.append(String.format("numNodes=%s", numNodes)).append("\n");
- output.append(prefix).append(String.format("pushdown agg=%s",
pushDownAggNoGroupingOp)).append("\n");
+
+ // pushdown agg
+ output.append(prefix).append(String.format("pushdown agg=%s",
pushDownAggNoGroupingOp));
+ if (pushDownAggNoGroupingOp.equals(TPushAggOp.COUNT)) {
+ output.append(" (").append(getPushDownCount()).append(")");
+ }
+ output.append("\n");
+
if (useTopnFilter()) {
String topnFilterSources = String.join(",",
topnFilterSortNodes.stream()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index d5d3360845c..bfb2a5aeb34 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -78,7 +78,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -214,10 +213,11 @@ public class IcebergScanNode extends FileQueryScanNode {
boolean isPartitionedTable = icebergTable.spec().isPartitioned();
long rowCount = getCountFromSnapshot();
- if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount
> 0) {
+ if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT) && rowCount
>= 0) {
this.rowCount = rowCount;
return new ArrayList<>();
}
+
CloseableIterable<FileScanTask> fileScanTasks =
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
@@ -271,13 +271,6 @@ public class IcebergScanNode extends FileQueryScanNode {
throw new UserException(e.getMessage(), e.getCause());
}
- // TODO: Need to delete this as we can handle count pushdown in fe side
- TPushAggOp aggOp = getPushDownAggNoGroupingOp();
- if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() > 0) {
- // we can create a special empty split and skip the plan process
- return Collections.singletonList(splits.get(0));
- }
-
readPartitionNum = partitionPathSet.size();
return splits;
@@ -431,7 +424,7 @@ public class IcebergScanNode extends FileQueryScanNode {
// empty table
if (snapshot == null) {
- return -1;
+ return 0;
}
Map<String, String> summary = snapshot.summary();
@@ -448,12 +441,17 @@ public class IcebergScanNode extends FileQueryScanNode {
super.toThrift(planNode);
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
long countFromSnapshot = getCountFromSnapshot();
- if (countFromSnapshot > 0) {
+ if (countFromSnapshot >= 0) {
planNode.setPushDownCount(countFromSnapshot);
}
}
}
+ @Override
+ public long getPushDownCount() {
+ return getCountFromSnapshot();
+ }
+
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
if (pushdownIcebergPredicates.isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 5fbe354ffa0..62c823fdec4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -193,6 +193,7 @@ import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.TableFunctionNode;
import org.apache.doris.planner.UnionNode;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TFetchOption;
@@ -1111,6 +1112,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
+ storageLayerAggregate.getAggOp());
}
+ if (storageLayerAggregate.getRelation() instanceof PhysicalFileScan
+ && pushAggOp.equals(TPushAggOp.COUNT)
+ &&
!ConnectContext.get().getSessionVariable().isEnableCountPushDownForExternalTable())
{
+ pushAggOp = TPushAggOp.NONE;
+ }
+
context.setRelationPushAggOp(
storageLayerAggregate.getRelation().getRelationId(),
pushAggOp);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 476c7a36533..eb12d86a6bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -597,6 +597,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
+ public static final String ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE =
"enable_count_push_down_for_external_table";
+
public static final String SHOW_ALL_FE_CONNECTION =
"show_all_fe_connection";
public static final String MAX_MSG_SIZE_OF_RESULT_RECEIVER =
"max_msg_size_of_result_receiver";
@@ -1849,6 +1851,10 @@ public class SessionVariable implements Serializable,
Writable {
description = {"强制使用jni方式读取外表", "Force the use of jni mode to read
external table"})
private boolean forceJniScanner = false;
+ @VariableMgr.VarAttr(name = ENABLE_COUNT_PUSH_DOWN_FOR_EXTERNAL_TABLE,
+ description = {"对外表启用 count(*) 下推优化", "enable count(*) pushdown
optimization for external table"})
+ private boolean enableCountPushDownForExternalTable = true;
+
public static final String IGNORE_RUNTIME_FILTER_IDS =
"ignore_runtime_filter_ids";
public Set<Integer> getIgnoredRuntimeFilterIds() {
@@ -4121,6 +4127,10 @@ public class SessionVariable implements Serializable,
Writable {
forceJniScanner = force;
}
+ public boolean isEnableCountPushDownForExternalTable() {
+ return enableCountPushDownForExternalTable;
+ }
+
public boolean isForceToLocalShuffle() {
return enableLocalShuffle && enableNereidsPlanner &&
forceToLocalShuffle;
}
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out
new file mode 100644
index 00000000000..f2e945f9cec
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/test_iceberg_optimize_count.out
@@ -0,0 +1,25 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+1000
+
+-- !q02 --
+1000
+
+-- !q03 --
+1000
+
+-- !q04 --
+1000
+
+-- !q05 --
+1000
+
+-- !q06 --
+1000
+
+-- !q07 --
+1000
+
+-- !q08 --
+1000
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
new file mode 100644
index 00000000000..e55d0fefa8d
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_optimize_count.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_optimize_count",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ return
+ }
+
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "test_iceberg_optimize_count"
+
+ try {
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """ switch ${catalog_name} """
+ sql """ create database if not exists ${catalog_name} """
+ sql """ use format_v2 """
+
+ sqlstr1 = """ select count(*) from sample_cow_orc; """
+ sqlstr2 = """ select count(*) from sample_cow_parquet; """
+ sqlstr3 = """ select count(*) from sample_mor_orc; """
+ sqlstr4 = """ select count(*) from sample_mor_parquet; """
+
+ // use push down count
+ sql """ set enable_count_push_down_for_external_table=true; """
+
+ qt_q01 """${sqlstr1}"""
+ qt_q02 """${sqlstr2}"""
+ qt_q03 """${sqlstr3}"""
+ qt_q04 """${sqlstr4}"""
+
+ explain {
+ sql("""${sqlstr1}""")
+ contains """pushdown agg=COUNT (1000)"""
+ }
+ explain {
+ sql("""${sqlstr2}""")
+ contains """pushdown agg=COUNT (1000)"""
+ }
+ explain {
+ sql("""${sqlstr3}""")
+ contains """pushdown agg=COUNT (1000)"""
+ }
+ explain {
+ sql("""${sqlstr4}""")
+ contains """pushdown agg=COUNT (1000)"""
+ }
+
+ // don't use push down count
+ sql """ set enable_count_push_down_for_external_table=false; """
+
+ qt_q05 """${sqlstr1}"""
+ qt_q06 """${sqlstr2}"""
+ qt_q07 """${sqlstr3}"""
+ qt_q08 """${sqlstr4}"""
+
+ explain {
+ sql("""${sqlstr1}""")
+ contains """pushdown agg=NONE"""
+ }
+ explain {
+ sql("""${sqlstr2}""")
+ contains """pushdown agg=NONE"""
+ }
+ explain {
+ sql("""${sqlstr3}""")
+ contains """pushdown agg=NONE"""
+ }
+ explain {
+ sql("""${sqlstr4}""")
+ contains """pushdown agg=NONE"""
+ }
+
+ } finally {
+ sql """ set enable_count_push_down_for_external_table=true; """
+ sql """drop catalog if exists ${catalog_name}"""
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]