This is an automated email from the ASF dual-hosted git repository.
lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 3e160bf008 [GLUTEN-7647][CH] Lazy expand for aggregation (#7649)
3e160bf008 is described below
commit 3e160bf0080c5ca66c5ff16792feba5e6366bbc9
Author: lgbo <[email protected]>
AuthorDate: Wed Nov 6 16:18:52 2024 +0800
[GLUTEN-7647][CH] Lazy expand for aggregation (#7649)
* add lazy expand rule
* stage
* stage
* apply rule before query stage
* stage
* update
* 1028
* update
* refactor
* refactor 1101
* fixed some bugs
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 12 +
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 1 +
.../gluten/extension/LazyAggregateExpandRule.scala | 370 +++++++++++++++++++++
.../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 45 +++
.../local-engine/Operator/AdvancedExpandStep.cpp | 354 ++++++++++++++++++++
cpp-ch/local-engine/Operator/AdvancedExpandStep.h | 97 ++++++
.../Parser/RelParsers/AggregateRelParser.cpp | 3 +-
.../Parser/RelParsers/ExpandRelParser.cpp | 97 +++++-
.../Parser/RelParsers/ExpandRelParser.h | 12 +-
9 files changed, 984 insertions(+), 7 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index ba17d12ffa..54ab38569b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -353,6 +353,18 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
)
}
+ // It try to move the expand node after the pre-aggregate node. That is to
make the plan from
+ // expand -> pre-aggregate -> shuffle -> final-aggregate
+ // to
+ // pre-aggregate -> expand -> shuffle -> final-aggregate
+ // It could reduce the overhead of pre-aggregate node.
+ def enableLazyAggregateExpand(): Boolean = {
+ SparkEnv.get.conf.getBoolean(
+ CHConf.runtimeConfig("enable_lazy_aggregate_expand"),
+ defaultValue = true
+ )
+ }
+
override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 470ece4037..4107844f32 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -85,6 +85,7 @@ private object CHRuleApi {
injector.injectTransform(_ => CollapseProjectExecTransformer)
injector.injectTransform(c =>
RewriteSortMergeJoinToHashJoinRule.apply(c.session))
injector.injectTransform(c =>
PushdownAggregatePreProjectionAheadExpand.apply(c.session))
+ injector.injectTransform(c => LazyAggregateExpandRule.apply(c.session))
injector.injectTransform(
c =>
intercept(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/LazyAggregateExpandRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/LazyAggregateExpandRule.scala
new file mode 100644
index 0000000000..e06503a5e1
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/LazyAggregateExpandRule.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+import org.apache.gluten.execution._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types._
+
+/*
+ * For aggregation with grouping sets, we need to expand the grouping sets
+ * to individual group by.
+ * 1. It need to make copies of the original data.
+ * 2. run the aggregation on the multi copied data.
+ * Both of these two are expensive.
+ *
+ * We could do this as following
+ * 1. Run the aggregation on full grouping keys.
+ * 2. Expand the aggregation result to the full grouping sets.
+ * 3. Run the aggregation on the expanded data.
+ *
+ * So the plan is transformed from
+ * expand -> partial aggregating -> shuffle -> final merge aggregating
+ * to
+ * partial aggregating -> expand -> shuffle -> final merge aggregating
+ *
+ * Notice:
+ * If the aggregation involves distinct, we can't do this optimization.
+ */
+
+case class LazyAggregateExpandRule(session: SparkSession) extends
Rule[SparkPlan] with Logging {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ logDebug(s"xxx enable lazy aggregate expand:
${CHBackendSettings.enableLazyAggregateExpand}")
+ if (!CHBackendSettings.enableLazyAggregateExpand) {
+ return plan
+ }
+ plan.transformUp {
+ case shuffle @ ColumnarShuffleExchangeExec(
+ HashPartitioning(hashExpressions, _),
+ CHHashAggregateExecTransformer(
+ _,
+ groupingExpressions,
+ aggregateExpressions,
+ _,
+ _,
+ resultExpressions,
+ ExpandExecTransformer(projections, output, child)),
+ _,
+ _,
+ _
+ ) =>
+ logDebug(s"xxx match plan:$shuffle")
+ val partialAggregate =
shuffle.child.asInstanceOf[CHHashAggregateExecTransformer]
+ val expand = partialAggregate.child.asInstanceOf[ExpandExecTransformer]
+ logDebug(
+ s"xxx partialAggregate: groupingExpressions:" +
+ s"${partialAggregate.groupingExpressions}\n" +
+ s"aggregateAttributes:${partialAggregate.aggregateAttributes}\n" +
+ s"aggregateExpressions:${partialAggregate.aggregateExpressions}\n"
+
+ s"resultExpressions:${partialAggregate.resultExpressions}")
+ if (doValidation(partialAggregate, expand, shuffle)) {
+
+ val attributesToReplace = buildReplaceAttributeMap(expand)
+ logDebug(s"xxx attributesToReplace: $attributesToReplace")
+
+ val newPartialAggregate = buildAheadAggregateExec(
+ partialAggregate,
+ expand,
+ attributesToReplace
+ )
+
+ val newExpand = buildPostExpandExec(
+ expand,
+ partialAggregate,
+ newPartialAggregate,
+ attributesToReplace
+ )
+
+ val newShuffle = shuffle.copy(child = newExpand)
+ logDebug(s"xxx new plan: $newShuffle")
+ newShuffle
+ } else {
+ shuffle
+ }
+ case shuffle @ ColumnarShuffleExchangeExec(
+ HashPartitioning(hashExpressions, _),
+ CHHashAggregateExecTransformer(
+ _,
+ groupingExpressions,
+ aggregateExpressions,
+ _,
+ _,
+ resultExpressions,
+ FilterExecTransformer(_, ExpandExecTransformer(projections,
output, child))),
+ _,
+ _,
+ _
+ ) =>
+ val partialAggregate =
shuffle.child.asInstanceOf[CHHashAggregateExecTransformer]
+ val filter = partialAggregate.child.asInstanceOf[FilterExecTransformer]
+ val expand = filter.child.asInstanceOf[ExpandExecTransformer]
+ logDebug(
+ s"xxx partialAggregate: groupingExpressions:" +
+ s"${partialAggregate.groupingExpressions}\n" +
+ s"aggregateAttributes:${partialAggregate.aggregateAttributes}\n" +
+ s"aggregateExpressions:${partialAggregate.aggregateExpressions}\n"
+
+ s"resultExpressions:${partialAggregate.resultExpressions}")
+ if (doValidation(partialAggregate, expand, shuffle)) {
+ val attributesToReplace = buildReplaceAttributeMap(expand)
+ logDebug(s"xxx attributesToReplace: $attributesToReplace")
+
+ val newPartialAggregate = buildAheadAggregateExec(
+ partialAggregate,
+ expand,
+ attributesToReplace
+ )
+
+ val newExpand = buildPostExpandExec(
+ expand,
+ partialAggregate,
+ newPartialAggregate,
+ attributesToReplace
+ )
+
+ val newFilter = filter.copy(child = newExpand)
+
+ val newShuffle = shuffle.copy(child = newFilter)
+ logDebug(s"xxx new plan: $newShuffle")
+ newShuffle
+
+ } else {
+ shuffle
+ }
+ }
+ }
+
+ // Just enable for simple cases. Some of cases that are not supported:
+ // 1. select count(a),count(b), count(1), count(distinct(a)),
count(distinct(b)) from values
+ // (1, null), (2,2) as data(a,b);
+ // 2. select n_name, count(distinct n_regionkey) as col1,
+ // count(distinct concat(n_regionkey, n_nationkey)) as col2 from
+ // nation group by n_name;
+ def doValidation(
+ aggregate: CHHashAggregateExecTransformer,
+ expand: ExpandExecTransformer,
+ shuffle: ColumnarShuffleExchangeExec): Boolean = {
+ // all grouping keys must be attribute references
+ val expandOutputAttributes = expand.child.output.toSet
+ if (
+ !aggregate.groupingExpressions.forall(
+ e => e.isInstanceOf[Attribute] || e.isInstanceOf[Literal])
+ ) {
+ logDebug(s"xxx Not all grouping expression are attribute references")
+ return false
+ }
+ // all shuffle keys must be attribute references
+ if (
+ !shuffle.outputPartitioning
+ .asInstanceOf[HashPartitioning]
+ .expressions
+ .forall(e => e.isInstanceOf[Attribute] || e.isInstanceOf[Literal])
+ ) {
+ logDebug(s"xxx Not all shuffle hash expression are attribute references")
+ return false
+ }
+
+ // 1. for safty, we don't enbale this optimization for all aggregate
functions.
+ // 2. if any aggregate function uses attributes which is not from expand's
child, we don't
+ // enable this
+ if (
+ !aggregate.aggregateExpressions.forall(
+ e =>
+ isValidAggregateFunction(e) &&
+
e.aggregateFunction.references.forall(expandOutputAttributes.contains(_)))
+ ) {
+ logDebug(s"xxx Some aggregate functions are not supported")
+ return false
+ }
+
+ // get the group id's position in the expand's output
+ val gidIndex = findGroupingIdIndex(expand)
+ gidIndex != -1
+ }
+
+ // group id column doesn't have a fixed position, so we need to find it.
+ def findGroupingIdIndex(expand: ExpandExecTransformer): Int = {
+ def isValidGroupIdColumn(e: Expression, gids: Set[Long]): Long = {
+ if (!e.isInstanceOf[Literal]) {
+ return -1
+ }
+ val literalValue = e.asInstanceOf[Literal].value
+ e.dataType match {
+ case _: LongType =>
+ if (gids.contains(literalValue.asInstanceOf[Long])) {
+ -1
+ } else {
+ literalValue.asInstanceOf[Long]
+ }
+ case _: IntegerType =>
+ if (gids.contains(literalValue.asInstanceOf[Int].toLong)) {
+ -1
+ } else {
+ literalValue.asInstanceOf[Int].toLong
+ }
+ case _ => -1
+ }
+ }
+
+ var groupIdIndexes = Seq[Int]()
+ for (col <- 0 until expand.output.length) {
+ val expandCol = expand.projections(0)(col)
+ // gids should be unique
+ var gids = Set[Long]()
+ if (isValidGroupIdColumn(expandCol, gids) != -1) {
+ if (
+ expand.projections.forall {
+ projection =>
+ val res = isValidGroupIdColumn(projection(col), gids)
+ gids += res
+ res != -1
+ }
+ ) {
+ groupIdIndexes +:= col
+ }
+ }
+ }
+ if (groupIdIndexes.length == 1) {
+ logDebug(s"xxx gid is at pos ${groupIdIndexes(0)}")
+ groupIdIndexes(0)
+ } else {
+ -1
+ }
+ }
+
+ // Some of aggregate functions' output columns are not consistent with the
output of gluten.
+ // - average: in partial aggregation, the outputs are sum and count, but
gluten only generates one
+ // column, avg.
+ // - sum: if the input's type is decimal, the output are sum and isEmpty,
but gluten doesn't use
+ // the isEmpty column.
+ def isValidAggregateFunction(aggregateExpression: AggregateExpression):
Boolean = {
+ if (aggregateExpression.filter.isDefined) {
+ return false
+ }
+ aggregateExpression.aggregateFunction match {
+ case _: Count => true
+ case _: Max => true
+ case _: Min => true
+ case sum: Sum => !sum.dataType.isInstanceOf[DecimalType]
+ case _ => false
+ }
+ }
+
+ def getReplaceAttribute(
+ toReplace: Attribute,
+ attributesToReplace: Map[Attribute, Attribute]): Attribute = {
+ attributesToReplace.getOrElse(toReplace, toReplace)
+ }
+
+ def buildReplaceAttributeMap(expand: ExpandExecTransformer): Map[Attribute,
Attribute] = {
+ var fullExpandProjection = Seq[Expression]()
+ for (i <- 0 until expand.projections(0).length) {
+ val attr = expand.projections.find(x => x(i).isInstanceOf[Attribute])
match {
+ case Some(projection) => projection(i).asInstanceOf[Attribute]
+ case None => null
+ }
+ fullExpandProjection = fullExpandProjection :+ attr
+ }
+
+ var attributeMap = Map[Attribute, Attribute]()
+ for (i <- 0 until expand.output.length) {
+ if (fullExpandProjection(i).isInstanceOf[Attribute]) {
+ attributeMap += (expand.output(i) ->
fullExpandProjection(i).asInstanceOf[Attribute])
+ }
+ }
+ attributeMap
+ }
+
+ def buildPostExpandProjections(
+ originalExpandProjections: Seq[Seq[Expression]],
+ originalExpandOutput: Seq[Attribute],
+ newExpandOutput: Seq[Attribute]): Seq[Seq[Expression]] = {
+ val newExpandProjections = originalExpandProjections.map {
+ projection =>
+ newExpandOutput.map {
+ attr =>
+ val index = originalExpandOutput.indexWhere(_.semanticEquals(attr))
+ if (index != -1) {
+ projection(index)
+ } else {
+ attr
+ }
+ }
+ }
+ newExpandProjections
+ }
+
+ // 1. make expand's child be aggregate's child
+ // 2. replace the attributes in groupingExpressions and resultExpressions as
needed
+ def buildAheadAggregateExec(
+ partialAggregate: CHHashAggregateExecTransformer,
+ expand: ExpandExecTransformer,
+ attributesToReplace: Map[Attribute, Attribute]): SparkPlan = {
+ val groupIdAttribute = expand.output(findGroupingIdIndex(expand))
+
+ // New grouping expressions should include the group id column
+ val groupingExpressions =
+ partialAggregate.groupingExpressions
+ .filter(
+ e => e.toAttribute != groupIdAttribute &&
attributesToReplace.contains(e.toAttribute))
+ .map(e => getReplaceAttribute(e.toAttribute, attributesToReplace))
+ .distinct
+ logDebug(
+ s"xxx newGroupingExpresion: $groupingExpressions,\n" +
+ s"groupingExpressions: ${partialAggregate.groupingExpressions}")
+
+ // Remove group id column from result expressions
+ val resultExpressions = partialAggregate.resultExpressions
+ .filter(_.toAttribute != groupIdAttribute)
+ .map(e => getReplaceAttribute(e.toAttribute, attributesToReplace))
+ logDebug(
+ s"xxx newResultExpressions: $resultExpressions\n" +
+ s"resultExpressions:${partialAggregate.resultExpressions}")
+ partialAggregate.copy(
+ groupingExpressions = groupingExpressions,
+ resultExpressions = resultExpressions,
+ child = expand.child)
+ }
+
+ def buildPostExpandExec(
+ expand: ExpandExecTransformer,
+ partialAggregate: CHHashAggregateExecTransformer,
+ child: SparkPlan,
+ attributesToReplace: Map[Attribute, Attribute]): SparkPlan = {
+ // The output of the native plan is not completely consistent with Spark.
+ val aggregateOutput = partialAggregate.output
+ logDebug(s"xxx aggregateResultAttributes:
${partialAggregate.aggregateResultAttributes}")
+ logDebug(s"xxx aggregateOutput: $aggregateOutput")
+
+ val expandProjections = buildPostExpandProjections(
+ expand.projections,
+ expand.output,
+ aggregateOutput
+ )
+ logDebug(s"xxx expandProjections:
$expandProjections\nprojections:${expand.projections}")
+ ExpandExecTransformer(expandProjections, aggregateOutput, child)
+ }
+
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 739b040dba..40b704d2e8 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -3022,5 +3022,50 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends
GlutenClickHouseTPCHAbstr
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
spark.sql("drop table test_tbl_7220")
}
+
+ test("GLLUTEN-7647 lazy expand") {
+ def checkLazyExpand(df: DataFrame): Unit = {
+ val expands = collectWithSubqueries(df.queryExecution.executedPlan) {
+ case e: ExpandExecTransformer if
(e.child.isInstanceOf[HashAggregateExecBaseTransformer]) =>
+ e
+ }
+ assert(expands.size == 1)
+ }
+ var sql =
+ """
+ |select n_regionkey, n_nationkey,
+ |sum(n_regionkey), count(n_name), max(n_regionkey), min(n_regionkey)
+ |from nation group by n_regionkey, n_nationkey with cube
+ |order by n_regionkey, n_nationkey
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+
+ sql = """
+ |select n_regionkey, n_nationkey, sum(n_regionkey), count(distinct
n_name)
+ |from nation group by n_regionkey, n_nationkey with cube
+ |order by n_regionkey, n_nationkey
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+
+ sql = """
+ |select * from(
+ |select n_regionkey, n_nationkey,
+ |sum(n_regionkey), count(n_name), max(n_regionkey),
min(n_regionkey)
+ |from nation group by n_regionkey, n_nationkey with cube
+ |) where n_regionkey != 0
+ |order by n_regionkey, n_nationkey
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+
+ sql = """
+ |select * from(
+ |select n_regionkey, n_nationkey,
+ |sum(n_regionkey), count(distinct n_name), max(n_regionkey),
min(n_regionkey)
+ |from nation group by n_regionkey, n_nationkey with cube
+ |) where n_regionkey != 0
+ |order by n_regionkey, n_nationkey
+ |""".stripMargin
+ compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+ }
}
// scalastyle:on line.size.limit
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
new file mode 100644
index 0000000000..6ac5f5fc8f
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AdvancedExpandStep.h"
+#include <iterator>
+#include <system_error>
+#include <unordered_set>
+#include <Columns/ColumnNullable.h>
+#include <Columns/ColumnsNumber.h>
+#include <Columns/IColumn.h>
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <Interpreters/Aggregator.h>
+#include <Interpreters/ExpressionActions.h>
+#include <Interpreters/castColumn.h>
+#include <Operator/GraceAggregatingTransform.h>
+#include <Processors/ResizeProcessor.h>
+#include <Processors/Transforms/ExpressionTransform.h>
+#include <QueryPipeline/Pipe.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Common/CHUtil.h>
+#include <Common/WeakHash.h>
+
+#include <Poco/Logger.h>
+#include <Common/logger_useful.h>
+
+namespace DB
+{
+namespace Setting
+{
+extern const SettingsUInt64 max_bytes_before_external_group_by;
+extern const SettingsBool optimize_group_by_constant_keys;
+extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsBool empty_result_for_aggregation_by_empty_set;
+extern const SettingsUInt64 group_by_two_level_threshold_bytes;
+extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
+extern const SettingsUInt64 max_rows_to_group_by;
+extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
+extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
+extern const SettingsUInt64 group_by_two_level_threshold;
+extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsUInt64 max_block_size;
+}
+}
+
+namespace local_engine
+{
+
+static DB::ITransformingStep::Traits getTraits()
+{
+ return DB::ITransformingStep::Traits{
+ {
+ .returns_single_stream = true,
+ .preserves_number_of_streams = false,
+ .preserves_sorting = false,
+ },
+ {
+ .preserves_number_of_rows = false,
+ }};
+}
+
+AdvancedExpandStep::AdvancedExpandStep(
+ DB::ContextPtr context_,
+ const DB::Block & input_header_,
+ size_t grouping_keys_,
+ const DB::AggregateDescriptions & aggregate_descriptions_,
+ const ExpandField & project_set_exprs_)
+ : DB::ITransformingStep(input_header_, buildOutputHeader(input_header_,
project_set_exprs_), getTraits())
+ , context(context_)
+ , grouping_keys(grouping_keys_)
+ , aggregate_descriptions(aggregate_descriptions_)
+ , project_set_exprs(project_set_exprs_)
+{
+}
+
+DB::Block AdvancedExpandStep::buildOutputHeader(const DB::Block &, const
ExpandField & project_set_exprs_)
+{
+ DB::ColumnsWithTypeAndName cols;
+ const auto & types = project_set_exprs_.getTypes();
+ const auto & names = project_set_exprs_.getNames();
+
+ chassert(names.size() == types.size());
+
+ for (size_t i = 0; i < project_set_exprs_.getExpandCols(); ++i)
+ cols.emplace_back(DB::ColumnWithTypeAndName(types[i], names[i]));
+
+ return DB::Block(std::move(cols));
+}
+
+void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder &
pipeline, const DB::BuildQueryPipelineSettings & pipeline_settings)
+{
+ const auto & settings = context->getSettingsRef();
+ DB::Names aggregate_grouping_keys;
+ for (size_t i = 0; i < output_header->columns(); ++i)
+ {
+ const auto & col = output_header->getByPosition(i);
+ if (typeid_cast<const DB::ColumnAggregateFunction *>(col.column.get()))
+ break;
+ aggregate_grouping_keys.push_back(col.name);
+ }
+ DB::Aggregator::Params params(
+ aggregate_grouping_keys,
+ aggregate_descriptions,
+ false,
+ settings[DB::Setting::max_rows_to_group_by],
+ settings[DB::Setting::group_by_overflow_mode],
+ settings[DB::Setting::group_by_two_level_threshold],
+ settings[DB::Setting::group_by_two_level_threshold_bytes],
+ settings[DB::Setting::max_bytes_before_external_group_by],
+ settings[DB::Setting::empty_result_for_aggregation_by_empty_set],
+ context->getTempDataOnDisk(),
+ settings[DB::Setting::max_threads],
+ settings[DB::Setting::min_free_disk_space_for_temporary_data],
+ true,
+ 3,
+
PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]),
+ /*enable_prefetch*/ true,
+ /*only_merge*/ false,
+ settings[DB::Setting::optimize_group_by_constant_keys],
+
settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization],
+ /*StatsCollectingParams*/ {});
+
+ auto input_header = input_headers.front();
+ auto build_transform = [&](DB::OutputPortRawPtrs outputs)
+ {
+ DB::Processors new_processors;
+ for (auto & output : outputs)
+ {
+ auto expand_processor
+ = std::make_shared<AdvancedExpandTransform>(input_header,
*output_header, grouping_keys, project_set_exprs);
+ DB::connect(*output, expand_processor->getInputs().front());
+ new_processors.push_back(expand_processor);
+
+ auto expand_output_header =
expand_processor->getOutputs().front().getHeader();
+
+ auto transform_params =
std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params,
false);
+ auto aggregate_processor
+ =
std::make_shared<GraceAggregatingTransform>(expand_output_header,
transform_params, context, false, false);
+ DB::connect(expand_processor->getOutputs().back(),
aggregate_processor->getInputs().front());
+ new_processors.push_back(aggregate_processor);
+ auto aggregate_output_header =
aggregate_processor->getOutputs().front().getHeader();
+
+ auto resize_processor =
std::make_shared<DB::ResizeProcessor>(expand_output_header, 2, 1);
+ DB::connect(aggregate_processor->getOutputs().front(),
resize_processor->getInputs().front());
+ DB::connect(expand_processor->getOutputs().front(),
resize_processor->getInputs().back());
+ new_processors.push_back(resize_processor);
+ }
+ return new_processors;
+ };
+ pipeline.transform(build_transform);
+}
+
+void AdvancedExpandStep::describePipeline(DB::IQueryPlanStep::FormatSettings &
settings) const
+{
+ if (!processors.empty())
+ DB::IQueryPlanStep::describePipeline(processors, settings);
+}
+
+void AdvancedExpandStep::updateOutputHeader()
+{
+ output_header = buildOutputHeader(input_headers.front(),
project_set_exprs);
+}
+
+/// It has two output ports. The 1st output port is for high cardinality data,
the 2nd output port is for
+/// low cardinality data.
+AdvancedExpandTransform::AdvancedExpandTransform(
+ const DB::Block & input_header_, const DB::Block & output_header_, size_t
grouping_keys_, const ExpandField & project_set_exprs_)
+ : DB::IProcessor({input_header_}, {output_header_, output_header_})
+ , grouping_keys(grouping_keys_)
+ , project_set_exprs(project_set_exprs_)
+ , input_header(input_header_)
+{
+ for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i)
+ {
+ is_low_cardinality_expand.push_back(true);
+ }
+
+ for (auto & port : outputs)
+ {
+ output_ports.push_back(&port);
+ }
+}
+
+DB::IProcessor::Status AdvancedExpandTransform::prepare()
+{
+ auto & input = inputs.front();
+
+ if (isCancelled() || output_ports[0]->isFinished() ||
output_ports[1]->isFinished())
+ {
+ input.close();
+ output_ports[0]->finish();
+ output_ports[1]->finish();
+ return Status::Finished;
+ }
+
+ if (has_output)
+ {
+ auto & output_port =
*output_ports[is_low_cardinality_expand[expand_expr_iterator - 1]];
+ if (output_port.canPush())
+ {
+ output_port.push(std::move(output_chunk));
+ has_output = false;
+ auto status = expand_expr_iterator >=
project_set_exprs.getExpandRows() ? Status::NeedData : Status::Ready;
+ return status;
+ }
+ else
+ {
+ return Status::PortFull;
+ }
+ }
+
+ if (!has_input)
+ {
+ if (input.isFinished())
+ {
+ if (!cardinality_detect_blocks.empty())
+ {
+ input_finished = true;
+ return Status::Ready;
+ }
+ else
+ {
+ output_ports[0]->finish();
+ output_ports[1]->finish();
+ return Status::Finished;
+ }
+ }
+
+ input.setNeeded();
+ if (!input.hasData())
+ {
+ return Status::NeedData;
+ }
+ input_chunk = input.pull(true);
+ has_input = true;
+ expand_expr_iterator = 0;
+ }
+
+ return Status::Ready;
+}
+
+void AdvancedExpandTransform::work()
+{
+ if (!input_finished && cardinality_detect_rows <
rows_for_detect_cardinality)
+ {
+
cardinality_detect_blocks.push_back(input_header.cloneWithColumns(input_chunk.detachColumns()));
+ cardinality_detect_rows += cardinality_detect_blocks.back().rows();
+ has_input = false;
+ }
+ if ((input_finished || cardinality_detect_rows >=
rows_for_detect_cardinality) && !cardinality_detect_blocks.empty())
+ {
+ detectCardinality();
+ }
+ else if (!input_finished && cardinality_detect_rows <
rows_for_detect_cardinality)
+ return;
+
+ /// The phase of detecting grouping keys' cardinality is finished here.
+ expandInputChunk();
+}
+
+void AdvancedExpandTransform::detectCardinality()
+{
+ DB::Block block =
BlockUtil::concatenateBlocksMemoryEfficiently(std::move(cardinality_detect_blocks));
+ std::vector<bool> is_col_low_cardinality;
+ for (size_t i = 0; i < grouping_keys; ++i)
+ {
+ DB::WeakHash32 hash(cardinality_detect_rows);
+ std::unordered_set<UInt32> distinct_ids;
+ const auto & data = hash.getData();
+ for (size_t j = 0; j < cardinality_detect_rows; ++j)
+ distinct_ids.insert(data[j]);
+ size_t distinct_ids_cnt = distinct_ids.size();
+ is_col_low_cardinality.push_back(distinct_ids.size() < 1000);
+ }
+
+ for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
+ {
+ const auto & kinds = project_set_exprs.getKinds()[i];
+ for (size_t k = 0; k < grouping_keys; ++k)
+ {
+ const auto & kind = kinds[k];
+ if (kind == EXPAND_FIELD_KIND_SELECTION &&
!is_col_low_cardinality[k])
+ {
+ is_low_cardinality_expand[i] = false;
+ break;
+ }
+ }
+ }
+ LOG_DEBUG(getLogger("AdvancedExpandTransform"), "Low cardinality expand:
{}", fmt::join(is_low_cardinality_expand, ","));
+
+ input_chunk = DB::Chunk(block.getColumns(), block.rows());
+ cardinality_detect_blocks.clear();
+}
+
+void AdvancedExpandTransform::expandInputChunk()
+{
+ const auto & input_columns = input_chunk.getColumns();
+ const auto & types = project_set_exprs.getTypes();
+ const auto & kinds = project_set_exprs.getKinds()[expand_expr_iterator];
+ const auto & fields = project_set_exprs.getFields()[expand_expr_iterator];
+ size_t rows = input_chunk.getNumRows();
+
+ DB::Columns columns(types.size());
+ for (size_t col_i = 0; col_i < types.size(); ++col_i)
+ {
+ const auto & type = types[col_i];
+ const auto & kind = kinds[col_i];
+ const auto & field = fields[col_i];
+
+ if (kind == EXPAND_FIELD_KIND_SELECTION)
+ {
+ auto index = field.safeGet<Int32>();
+ const auto & input_column = input_columns[index];
+
+ DB::ColumnWithTypeAndName input_arg;
+ input_arg.column = input_column;
+ input_arg.type = input_header.getByPosition(index).type;
+ /// input_column maybe non-Nullable
+ columns[col_i] = DB::castColumn(input_arg, type);
+ }
+ else if (kind == EXPAND_FIELD_KIND_LITERAL)
+ {
+ /// Add const column with field value
+ auto column = type->createColumnConst(rows,
field)->convertToFullColumnIfConst();
+ columns[col_i] = std::move(column);
+ }
+ else
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown
ExpandFieldKind {}", magic_enum::enum_name(kind));
+ }
+
+ output_chunk = DB::Chunk(std::move(columns), rows);
+ has_output = true;
+
+ ++expand_expr_iterator;
+ has_input = expand_expr_iterator < project_set_exprs.getExpandRows();
+}
+}
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
new file mode 100644
index 0000000000..2950846585
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Core/Block.h>
+#include <Core/Names.h>
+#include <Interpreters/AggregateDescription.h>
+#include <Parser/ExpandField.h>
+#include <Processors/Chunk.h>
+#include <Processors/IProcessor.h>
+#include <Processors/Port.h>
+#include <Processors/QueryPlan/IQueryPlanStep.h>
+#include <Processors/QueryPlan/ITransformingStep.h>
+
+namespace local_engine
+{
+// This step is used when we move the expand operator after the partial
aggregator.
+// To avoid increasing the overhead of shuffle when some of the grouping keys
are high cardinality, we add an extra aggregate operator after
+// this expand operator and aggregate the low cardinality grouping keys.
+class AdvancedExpandStep : public DB::ITransformingStep
+{
+public:
+ explicit AdvancedExpandStep(
+ DB::ContextPtr context_,
+ const DB::Block & input_header_,
+ size_t grouping_keys_,
+ const DB::AggregateDescriptions & aggregate_descriptions_,
+ const ExpandField & project_set_exprs_);
+ ~AdvancedExpandStep() override = default;
+
+ String getName() const override { return "AdvancedExpandStep"; }
+
+ void transformPipeline(DB::QueryPipelineBuilder & pipeline, const
DB::BuildQueryPipelineSettings & settings) override;
+ void describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const
override;
+
+ static DB::Block buildOutputHeader(const DB::Block & header, const
ExpandField & project_set_exprs_);
+
+protected:
+ DB::ContextPtr context;
+ size_t grouping_keys;
+ DB::AggregateDescriptions aggregate_descriptions;
+ ExpandField project_set_exprs;
+
+ void updateOutputHeader() override;
+};
+
+class AdvancedExpandTransform : public DB::IProcessor
+{
+public:
+ using Status = DB::IProcessor::Status;
+ /// Need to ensure that the input header is [grouping keys] ++
[aggregation columns]
+ explicit AdvancedExpandTransform(
+ const DB::Block & inpput_header_, const DB::Block & output_header_,
size_t goruping_keys_, const ExpandField & project_set_exprs_);
+ ~AdvancedExpandTransform() override = default;
+
+ Status prepare() override;
+ void work() override;
+ String getName() const override { return "AdvancedExpandTransform"; }
+
+private:
+ size_t grouping_keys = 0;
+ ExpandField project_set_exprs;
+ DB::Block input_header;
+ bool has_input = false;
+ bool has_output = false;
+ size_t expand_expr_iterator = 0;
+ std::vector<bool> is_low_cardinality_expand;
+ std::vector<size_t> approximate_grouping_keys;
+ size_t cardinality_detect_rows = 0;
+ std::vector<DB::Block> cardinality_detect_blocks;
+ static constexpr size_t rows_for_detect_cardinality = 10000;
+ bool input_finished = false;
+
+ std::vector<DB::OutputPort *> output_ports;
+
+ DB::Chunk input_chunk;
+ DB::Chunk output_chunk;
+
+ void detectCardinality();
+ void expandInputChunk();
+};
+
+}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
index 269d55e645..6bc8c7e6e1 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
@@ -525,8 +525,7 @@ void AggregateRelParser::addAggregatingStep()
/// We cannot use streaming aggregating strategy in step3.
Otherwise it will generate multiple blocks with same n_name in them. This
/// will make the result for count(distinct(n_name)) wrong. step3
must finish all inputs before it puts any block into step4.
/// So we introduce GraceAggregatingStep here, it can handle mass
data with high cardinality.
- auto aggregating_step
- = std::make_unique<GraceAggregatingStep>(getContext(),
plan->getCurrentHeader(), params, has_first_stage);
+ auto aggregating_step =
std::make_unique<GraceAggregatingStep>(getContext(), plan->getCurrentHeader(),
params, has_first_stage);
steps.emplace_back(aggregating_step.get());
plan->addStep(std::move(aggregating_step));
}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
index 8a64c445e7..f3d8b4ab11 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
@@ -15,11 +15,12 @@
* limitations under the License.
*/
#include "ExpandRelParser.h"
+#include <ratio>
#include <vector>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
+#include <Operator/AdvancedExpandStep.h>
#include <Operator/ExpandStep.h>
-#include <Parser/ExpandField.h>
#include <Parser/RelParsers/RelParser.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Common/logger_useful.h>
@@ -45,11 +46,17 @@ void updateType(DB::DataTypePtr & type, const
DB::DataTypePtr & new_type)
}
}
-DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr query_plan, const
substrait::Rel & rel, std::list<const substrait::Rel *> &)
+DB::QueryPlanPtr
+ExpandRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel &
rel, std::list<const substrait::Rel *> & rel_stack)
{
- const auto & expand_rel = rel.expand();
- const auto & header = query_plan->getCurrentHeader();
+ if (!isLazyAggregateExpand(rel.expand()))
+ return normalParse(std::move(query_plan), rel, rel_stack);
+ else
+ return lazyAggregateExpandParse(std::move(query_plan), rel, rel_stack);
+}
+ExpandField ExpandRelParser::buildExpandField(const DB::Block & header, const
substrait::ExpandRel & expand_rel)
+{
std::vector<std::vector<ExpandFieldKind>> expand_kinds;
std::vector<std::vector<DB::Field>> expand_fields;
std::vector<DB::DataTypePtr> types;
@@ -82,6 +89,10 @@ DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr
query_plan, const subst
auto field =
project_expr.selection().direct_reference().struct_field().field();
kinds.push_back(ExpandFieldKind::EXPAND_FIELD_KIND_SELECTION);
fields.push_back(field);
+ if (field >= header.columns())
+ {
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Field
index out of range: {}, header: {}", field, header.dumpStructure());
+ }
updateType(types[i], header.getByPosition(field).type);
const auto & name = header.getByPosition(field).name;
if (names[i].empty())
@@ -123,6 +134,28 @@ DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr
query_plan, const subst
}
ExpandField expand_field(names, types, expand_kinds, expand_fields);
+ return expand_field;
+}
+
+bool ExpandRelParser::isLazyAggregateExpand(const substrait::ExpandRel &
expand_rel)
+{
+ const auto & input_rel = expand_rel.input();
+ if (input_rel.rel_type_case() != substrait::Rel::RelTypeCase::kAggregate)
+ return false;
+ const auto & aggregate_rel = input_rel.aggregate();
+ for (const auto & measure : aggregate_rel.measures())
+ {
+ if (measure.measure().phase() !=
substrait::AggregationPhase::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE)
+ return false;
+ }
+ return true;
+}
+
+DB::QueryPlanPtr ExpandRelParser::normalParse(DB::QueryPlanPtr query_plan,
const substrait::Rel & rel, std::list<const substrait::Rel *> &)
+{
+ const auto & expand_rel = rel.expand();
+ const auto & header = query_plan->getCurrentHeader();
+ auto expand_field = buildExpandField(header, expand_rel);
auto expand_step =
std::make_unique<ExpandStep>(query_plan->getCurrentHeader(),
std::move(expand_field));
expand_step->setStepDescription("Expand Step");
steps.emplace_back(expand_step.get());
@@ -130,6 +163,62 @@ DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr
query_plan, const subst
return query_plan;
}
+DB::QueryPlanPtr ExpandRelParser::lazyAggregateExpandParse(
+ DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const
substrait::Rel *> & rel_stack)
+{
+ DB::Block input_header = query_plan->getCurrentHeader();
+ const auto & expand_rel = rel.expand();
+ auto expand_field = buildExpandField(input_header, expand_rel);
+ auto aggregate_rel = rel.expand().input().aggregate();
+ auto aggregate_descriptions = buildAggregations(input_header,
expand_field, aggregate_rel);
+
+ size_t grouping_keys =
aggregate_rel.groupings(0).grouping_expressions_size();
+
+ auto expand_step
+ = std::make_unique<AdvancedExpandStep>(getContext(), input_header,
grouping_keys, aggregate_descriptions, expand_field);
+ expand_step->setStepDescription("Advanced Expand Step");
+ steps.emplace_back(expand_step.get());
+ query_plan->addStep(std::move(expand_step));
+ return query_plan;
+}
+
+DB::AggregateDescriptions ExpandRelParser::buildAggregations(
+ const DB::Block & input_header, const ExpandField & expand_field, const
substrait::AggregateRel & aggregate_rel)
+{
+ auto header = AdvancedExpandStep::buildOutputHeader(input_header,
expand_field);
+ DB::AggregateDescriptions descriptions;
+ DB::ColumnsWithTypeAndName aggregate_columns;
+ for (const auto & col : header.getColumnsWithTypeAndName())
+ {
+ if (typeid_cast<const DB::ColumnAggregateFunction *>(col.column.get()))
+ aggregate_columns.push_back(col);
+ }
+
+ for (size_t i = 0; i < aggregate_rel.measures_size(); ++i)
+ {
+ /// The output header of the aggregate is [grouping keys] ++
[aggregation columns]
+ const auto & measure = aggregate_rel.measures(i);
+ const auto & col = aggregate_columns[i];
+ DB::AggregateDescription description;
+ auto aggregate_col = typeid_cast<const DB::ColumnAggregateFunction
*>(col.column.get());
+
+ description.column_name = col.name;
+ description.argument_names = {col.name};
+
+ auto aggregate_function = aggregate_col->getAggregateFunction();
+ description.parameters = aggregate_function->getParameters();
+
+ // Need apply "PartialMerge" combinator for the aggregate function.
+ auto function_name_with_combinator = aggregate_function->getName() +
"PartialMerge";
+ DB::AggregateFunctionProperties aggregate_function_properties;
+ description.function
+ = getAggregateFunction(function_name_with_combinator, {col.type},
aggregate_function_properties, description.parameters);
+
+ descriptions.emplace_back(description);
+ }
+ return descriptions;
+}
+
void registerExpandRelParser(RelParserFactory & factory)
{
auto builder = [](ParserContextPtr parser_context) { return
std::make_shared<ExpandRelParser>(parser_context); };
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
index 7ba985a3f6..4a178ab08d 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
@@ -16,9 +16,9 @@
*/
#pragma once
#include <optional>
+#include <Parser/ExpandField.h>
#include <Parser/RelParsers/RelParser.h>
-
namespace local_engine
{
class SerializedPlanParser;
@@ -29,7 +29,17 @@ public:
~ExpandRelParser() override = default;
DB::QueryPlanPtr
parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel,
std::list<const substrait::Rel *> & rel_stack_) override;
+ DB::QueryPlanPtr normalParse(DB::QueryPlanPtr query_plan, const
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_);
+ DB::QueryPlanPtr
+ lazyAggregateExpandParse(DB::QueryPlanPtr query_plan, const substrait::Rel
& rel, std::list<const substrait::Rel *> & rel_stack_);
std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel
& rel) override { return &rel.expand().input(); }
+
+private:
+ bool isLazyAggregateExpand(const substrait::ExpandRel & expand_rel);
+ ExpandField buildExpandField(const DB::Block & header, const
substrait::ExpandRel & expand_rel);
+
+ DB::AggregateDescriptions
+ buildAggregations(const DB::Block & input_header, const ExpandField &
expand_field, const substrait::AggregateRel & aggregate_rel);
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]