This is an automated email from the ASF dual-hosted git repository.

lgbo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e160bf008 [GLUTEN-7647][CH] Lazy expand for aggregation (#7649)
3e160bf008 is described below

commit 3e160bf0080c5ca66c5ff16792feba5e6366bbc9
Author: lgbo <[email protected]>
AuthorDate: Wed Nov 6 16:18:52 2024 +0800

    [GLUTEN-7647][CH] Lazy expand for aggregation (#7649)
    
    * add lazy expand rule
    
    * stage
    
    * stage
    
    * apply rule before query stage
    
    * stage
    
    * update
    
    * 1028
    
    * update
    
    * refactor
    
    * refactor 1101
    
    * fixed some bugs
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |  12 +
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   1 +
 .../gluten/extension/LazyAggregateExpandRule.scala | 370 +++++++++++++++++++++
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala |  45 +++
 .../local-engine/Operator/AdvancedExpandStep.cpp   | 354 ++++++++++++++++++++
 cpp-ch/local-engine/Operator/AdvancedExpandStep.h  |  97 ++++++
 .../Parser/RelParsers/AggregateRelParser.cpp       |   3 +-
 .../Parser/RelParsers/ExpandRelParser.cpp          |  97 +++++-
 .../Parser/RelParsers/ExpandRelParser.h            |  12 +-
 9 files changed, 984 insertions(+), 7 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index ba17d12ffa..54ab38569b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -353,6 +353,18 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
     )
   }
 
+  // It try to move the expand node after the pre-aggregate node. That is to 
make the plan from
+  //  expand -> pre-aggregate -> shuffle -> final-aggregate
+  // to
+  //  pre-aggregate -> expand -> shuffle -> final-aggregate
+  // It could reduce the overhead of pre-aggregate node.
+  def enableLazyAggregateExpand(): Boolean = {
+    SparkEnv.get.conf.getBoolean(
+      CHConf.runtimeConfig("enable_lazy_aggregate_expand"),
+      defaultValue = true
+    )
+  }
+
   override def enableNativeWriteFiles(): Boolean = {
     GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 470ece4037..4107844f32 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -85,6 +85,7 @@ private object CHRuleApi {
     injector.injectTransform(_ => CollapseProjectExecTransformer)
     injector.injectTransform(c => 
RewriteSortMergeJoinToHashJoinRule.apply(c.session))
     injector.injectTransform(c => 
PushdownAggregatePreProjectionAheadExpand.apply(c.session))
+    injector.injectTransform(c => LazyAggregateExpandRule.apply(c.session))
     injector.injectTransform(
       c =>
         intercept(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/LazyAggregateExpandRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/LazyAggregateExpandRule.scala
new file mode 100644
index 0000000000..e06503a5e1
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/LazyAggregateExpandRule.scala
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+import org.apache.gluten.execution._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types._
+
+/*
+ * For aggregation with grouping sets, we need to expand the grouping sets
+ * to individual group by.
+ * 1. It need to make copies of the original data.
+ * 2. run the aggregation on the multi copied data.
+ * Both of these two are expensive.
+ *
+ * We could do this as following
+ * 1. Run the aggregation on full grouping keys.
+ * 2. Expand the aggregation result to the full grouping sets.
+ * 3. Run the aggregation on the expanded data.
+ *
+ * So the plan is transformed from
+ *   expand -> partial aggregating -> shuffle -> final merge aggregating
+ * to
+ *   partial aggregating -> expand -> shuffle -> final merge aggregating
+ *
+ * Notice:
+ * If the aggregation involves distinct, we can't do this optimization.
+ */
+
+case class LazyAggregateExpandRule(session: SparkSession) extends 
Rule[SparkPlan] with Logging {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    logDebug(s"xxx enable lazy aggregate expand: 
${CHBackendSettings.enableLazyAggregateExpand}")
+    if (!CHBackendSettings.enableLazyAggregateExpand) {
+      return plan
+    }
+    plan.transformUp {
+      case shuffle @ ColumnarShuffleExchangeExec(
+            HashPartitioning(hashExpressions, _),
+            CHHashAggregateExecTransformer(
+              _,
+              groupingExpressions,
+              aggregateExpressions,
+              _,
+              _,
+              resultExpressions,
+              ExpandExecTransformer(projections, output, child)),
+            _,
+            _,
+            _
+          ) =>
+        logDebug(s"xxx match plan:$shuffle")
+        val partialAggregate = 
shuffle.child.asInstanceOf[CHHashAggregateExecTransformer]
+        val expand = partialAggregate.child.asInstanceOf[ExpandExecTransformer]
+        logDebug(
+          s"xxx partialAggregate: groupingExpressions:" +
+            s"${partialAggregate.groupingExpressions}\n" +
+            s"aggregateAttributes:${partialAggregate.aggregateAttributes}\n" +
+            s"aggregateExpressions:${partialAggregate.aggregateExpressions}\n" 
+
+            s"resultExpressions:${partialAggregate.resultExpressions}")
+        if (doValidation(partialAggregate, expand, shuffle)) {
+
+          val attributesToReplace = buildReplaceAttributeMap(expand)
+          logDebug(s"xxx attributesToReplace: $attributesToReplace")
+
+          val newPartialAggregate = buildAheadAggregateExec(
+            partialAggregate,
+            expand,
+            attributesToReplace
+          )
+
+          val newExpand = buildPostExpandExec(
+            expand,
+            partialAggregate,
+            newPartialAggregate,
+            attributesToReplace
+          )
+
+          val newShuffle = shuffle.copy(child = newExpand)
+          logDebug(s"xxx new plan: $newShuffle")
+          newShuffle
+        } else {
+          shuffle
+        }
+      case shuffle @ ColumnarShuffleExchangeExec(
+            HashPartitioning(hashExpressions, _),
+            CHHashAggregateExecTransformer(
+              _,
+              groupingExpressions,
+              aggregateExpressions,
+              _,
+              _,
+              resultExpressions,
+              FilterExecTransformer(_, ExpandExecTransformer(projections, 
output, child))),
+            _,
+            _,
+            _
+          ) =>
+        val partialAggregate = 
shuffle.child.asInstanceOf[CHHashAggregateExecTransformer]
+        val filter = partialAggregate.child.asInstanceOf[FilterExecTransformer]
+        val expand = filter.child.asInstanceOf[ExpandExecTransformer]
+        logDebug(
+          s"xxx partialAggregate: groupingExpressions:" +
+            s"${partialAggregate.groupingExpressions}\n" +
+            s"aggregateAttributes:${partialAggregate.aggregateAttributes}\n" +
+            s"aggregateExpressions:${partialAggregate.aggregateExpressions}\n" 
+
+            s"resultExpressions:${partialAggregate.resultExpressions}")
+        if (doValidation(partialAggregate, expand, shuffle)) {
+          val attributesToReplace = buildReplaceAttributeMap(expand)
+          logDebug(s"xxx attributesToReplace: $attributesToReplace")
+
+          val newPartialAggregate = buildAheadAggregateExec(
+            partialAggregate,
+            expand,
+            attributesToReplace
+          )
+
+          val newExpand = buildPostExpandExec(
+            expand,
+            partialAggregate,
+            newPartialAggregate,
+            attributesToReplace
+          )
+
+          val newFilter = filter.copy(child = newExpand)
+
+          val newShuffle = shuffle.copy(child = newFilter)
+          logDebug(s"xxx new plan: $newShuffle")
+          newShuffle
+
+        } else {
+          shuffle
+        }
+    }
+  }
+
+  // Just enable for simple cases. Some of cases that are not supported:
+  // 1. select count(a),count(b), count(1), count(distinct(a)), 
count(distinct(b)) from values
+  //    (1, null), (2,2) as data(a,b);
+  // 2. select n_name, count(distinct n_regionkey) as col1,
+  //    count(distinct concat(n_regionkey, n_nationkey)) as col2 from
+  //    nation group by n_name;
+  def doValidation(
+      aggregate: CHHashAggregateExecTransformer,
+      expand: ExpandExecTransformer,
+      shuffle: ColumnarShuffleExchangeExec): Boolean = {
+    // all grouping keys must be attribute references
+    val expandOutputAttributes = expand.child.output.toSet
+    if (
+      !aggregate.groupingExpressions.forall(
+        e => e.isInstanceOf[Attribute] || e.isInstanceOf[Literal])
+    ) {
+      logDebug(s"xxx Not all grouping expression are attribute references")
+      return false
+    }
+    // all shuffle keys must be attribute references
+    if (
+      !shuffle.outputPartitioning
+        .asInstanceOf[HashPartitioning]
+        .expressions
+        .forall(e => e.isInstanceOf[Attribute] || e.isInstanceOf[Literal])
+    ) {
+      logDebug(s"xxx Not all shuffle hash expression are attribute references")
+      return false
+    }
+
+    // 1. for safty, we don't enbale this optimization for all aggregate 
functions.
+    // 2. if any aggregate function uses attributes which is not from expand's 
child, we don't
+    // enable this
+    if (
+      !aggregate.aggregateExpressions.forall(
+        e =>
+          isValidAggregateFunction(e) &&
+            
e.aggregateFunction.references.forall(expandOutputAttributes.contains(_)))
+    ) {
+      logDebug(s"xxx Some aggregate functions are not supported")
+      return false
+    }
+
+    // get the group id's position in the expand's output
+    val gidIndex = findGroupingIdIndex(expand)
+    gidIndex != -1
+  }
+
+  // group id column doesn't have a fixed position, so we need to find it.
+  def findGroupingIdIndex(expand: ExpandExecTransformer): Int = {
+    def isValidGroupIdColumn(e: Expression, gids: Set[Long]): Long = {
+      if (!e.isInstanceOf[Literal]) {
+        return -1
+      }
+      val literalValue = e.asInstanceOf[Literal].value
+      e.dataType match {
+        case _: LongType =>
+          if (gids.contains(literalValue.asInstanceOf[Long])) {
+            -1
+          } else {
+            literalValue.asInstanceOf[Long]
+          }
+        case _: IntegerType =>
+          if (gids.contains(literalValue.asInstanceOf[Int].toLong)) {
+            -1
+          } else {
+            literalValue.asInstanceOf[Int].toLong
+          }
+        case _ => -1
+      }
+    }
+
+    var groupIdIndexes = Seq[Int]()
+    for (col <- 0 until expand.output.length) {
+      val expandCol = expand.projections(0)(col)
+      // gids should be unique
+      var gids = Set[Long]()
+      if (isValidGroupIdColumn(expandCol, gids) != -1) {
+        if (
+          expand.projections.forall {
+            projection =>
+              val res = isValidGroupIdColumn(projection(col), gids)
+              gids += res
+              res != -1
+          }
+        ) {
+          groupIdIndexes +:= col
+        }
+      }
+    }
+    if (groupIdIndexes.length == 1) {
+      logDebug(s"xxx  gid is at pos ${groupIdIndexes(0)}")
+      groupIdIndexes(0)
+    } else {
+      -1
+    }
+  }
+
+  // Some of aggregate functions' output columns are not consistent with the 
output of gluten.
+  // - average: in partial aggregation, the outputs are sum and count, but 
gluten only generates one
+  //   column, avg.
+  // - sum: if the input's type is decimal, the output are sum and isEmpty, 
but gluten doesn't use
+  //   the isEmpty column.
+  def isValidAggregateFunction(aggregateExpression: AggregateExpression): 
Boolean = {
+    if (aggregateExpression.filter.isDefined) {
+      return false
+    }
+    aggregateExpression.aggregateFunction match {
+      case _: Count => true
+      case _: Max => true
+      case _: Min => true
+      case sum: Sum => !sum.dataType.isInstanceOf[DecimalType]
+      case _ => false
+    }
+  }
+
+  def getReplaceAttribute(
+      toReplace: Attribute,
+      attributesToReplace: Map[Attribute, Attribute]): Attribute = {
+    attributesToReplace.getOrElse(toReplace, toReplace)
+  }
+
+  def buildReplaceAttributeMap(expand: ExpandExecTransformer): Map[Attribute, 
Attribute] = {
+    var fullExpandProjection = Seq[Expression]()
+    for (i <- 0 until expand.projections(0).length) {
+      val attr = expand.projections.find(x => x(i).isInstanceOf[Attribute]) 
match {
+        case Some(projection) => projection(i).asInstanceOf[Attribute]
+        case None => null
+      }
+      fullExpandProjection = fullExpandProjection :+ attr
+    }
+
+    var attributeMap = Map[Attribute, Attribute]()
+    for (i <- 0 until expand.output.length) {
+      if (fullExpandProjection(i).isInstanceOf[Attribute]) {
+        attributeMap += (expand.output(i) -> 
fullExpandProjection(i).asInstanceOf[Attribute])
+      }
+    }
+    attributeMap
+  }
+
+  def buildPostExpandProjections(
+      originalExpandProjections: Seq[Seq[Expression]],
+      originalExpandOutput: Seq[Attribute],
+      newExpandOutput: Seq[Attribute]): Seq[Seq[Expression]] = {
+    val newExpandProjections = originalExpandProjections.map {
+      projection =>
+        newExpandOutput.map {
+          attr =>
+            val index = originalExpandOutput.indexWhere(_.semanticEquals(attr))
+            if (index != -1) {
+              projection(index)
+            } else {
+              attr
+            }
+        }
+    }
+    newExpandProjections
+  }
+
+  // 1. make expand's child be aggregate's child
+  // 2. replace the attributes in groupingExpressions and resultExpressions as 
needed
+  def buildAheadAggregateExec(
+      partialAggregate: CHHashAggregateExecTransformer,
+      expand: ExpandExecTransformer,
+      attributesToReplace: Map[Attribute, Attribute]): SparkPlan = {
+    val groupIdAttribute = expand.output(findGroupingIdIndex(expand))
+
+    // New grouping expressions should include the group id column
+    val groupingExpressions =
+      partialAggregate.groupingExpressions
+        .filter(
+          e => e.toAttribute != groupIdAttribute && 
attributesToReplace.contains(e.toAttribute))
+        .map(e => getReplaceAttribute(e.toAttribute, attributesToReplace))
+        .distinct
+    logDebug(
+      s"xxx newGroupingExpresion: $groupingExpressions,\n" +
+        s"groupingExpressions: ${partialAggregate.groupingExpressions}")
+
+    // Remove group id column from result expressions
+    val resultExpressions = partialAggregate.resultExpressions
+      .filter(_.toAttribute != groupIdAttribute)
+      .map(e => getReplaceAttribute(e.toAttribute, attributesToReplace))
+    logDebug(
+      s"xxx newResultExpressions: $resultExpressions\n" +
+        s"resultExpressions:${partialAggregate.resultExpressions}")
+    partialAggregate.copy(
+      groupingExpressions = groupingExpressions,
+      resultExpressions = resultExpressions,
+      child = expand.child)
+  }
+
+  def buildPostExpandExec(
+      expand: ExpandExecTransformer,
+      partialAggregate: CHHashAggregateExecTransformer,
+      child: SparkPlan,
+      attributesToReplace: Map[Attribute, Attribute]): SparkPlan = {
+    // The output of the native plan is not completely consistent with Spark.
+    val aggregateOutput = partialAggregate.output
+    logDebug(s"xxx aggregateResultAttributes: 
${partialAggregate.aggregateResultAttributes}")
+    logDebug(s"xxx aggregateOutput: $aggregateOutput")
+
+    val expandProjections = buildPostExpandProjections(
+      expand.projections,
+      expand.output,
+      aggregateOutput
+    )
+    logDebug(s"xxx expandProjections: 
$expandProjections\nprojections:${expand.projections}")
+    ExpandExecTransformer(expandProjections, aggregateOutput, child)
+  }
+
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 739b040dba..40b704d2e8 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -3022,5 +3022,50 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
     compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
     spark.sql("drop table test_tbl_7220")
   }
+
+  test("GLLUTEN-7647 lazy expand") {
+    def checkLazyExpand(df: DataFrame): Unit = {
+      val expands = collectWithSubqueries(df.queryExecution.executedPlan) {
+        case e: ExpandExecTransformer if 
(e.child.isInstanceOf[HashAggregateExecBaseTransformer]) =>
+          e
+      }
+      assert(expands.size == 1)
+    }
+    var sql =
+      """
+        |select n_regionkey, n_nationkey,
+        |sum(n_regionkey), count(n_name), max(n_regionkey), min(n_regionkey)
+        |from nation group by n_regionkey, n_nationkey with cube
+        |order by n_regionkey, n_nationkey
+        |""".stripMargin
+    compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+
+    sql = """
+            |select n_regionkey, n_nationkey, sum(n_regionkey), count(distinct 
n_name)
+            |from nation group by n_regionkey, n_nationkey with cube
+            |order by n_regionkey, n_nationkey
+            |""".stripMargin
+    compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+
+    sql = """
+            |select * from(
+            |select n_regionkey, n_nationkey,
+            |sum(n_regionkey), count(n_name), max(n_regionkey), 
min(n_regionkey)
+            |from nation group by n_regionkey, n_nationkey with cube
+            |) where n_regionkey != 0
+            |order by n_regionkey, n_nationkey
+            |""".stripMargin
+    compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+
+    sql = """
+            |select * from(
+            |select n_regionkey, n_nationkey,
+            |sum(n_regionkey), count(distinct n_name), max(n_regionkey), 
min(n_regionkey)
+            |from nation group by n_regionkey, n_nationkey with cube
+            |) where n_regionkey != 0
+            |order by n_regionkey, n_nationkey
+            |""".stripMargin
+    compareResultsAgainstVanillaSpark(sql, true, checkLazyExpand)
+  }
 }
 // scalastyle:on line.size.limit
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp 
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
new file mode 100644
index 0000000000..6ac5f5fc8f
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.cpp
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AdvancedExpandStep.h"
+#include <iterator>
+#include <system_error>
+#include <unordered_set>
+#include <Columns/ColumnNullable.h>
+#include <Columns/ColumnsNumber.h>
+#include <Columns/IColumn.h>
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <Interpreters/Aggregator.h>
+#include <Interpreters/ExpressionActions.h>
+#include <Interpreters/castColumn.h>
+#include <Operator/GraceAggregatingTransform.h>
+#include <Processors/ResizeProcessor.h>
+#include <Processors/Transforms/ExpressionTransform.h>
+#include <QueryPipeline/Pipe.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Common/CHUtil.h>
+#include <Common/WeakHash.h>
+
+#include <Poco/Logger.h>
+#include <Common/logger_useful.h>
+
+namespace DB
+{
+namespace Setting
+{
+extern const SettingsUInt64 max_bytes_before_external_group_by;
+extern const SettingsBool optimize_group_by_constant_keys;
+extern const SettingsUInt64 min_free_disk_space_for_temporary_data;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsBool empty_result_for_aggregation_by_empty_set;
+extern const SettingsUInt64 group_by_two_level_threshold_bytes;
+extern const SettingsOverflowModeGroupBy group_by_overflow_mode;
+extern const SettingsUInt64 max_rows_to_group_by;
+extern const SettingsBool enable_memory_bound_merging_of_aggregation_results;
+extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
+extern const SettingsUInt64 group_by_two_level_threshold;
+extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
+extern const SettingsMaxThreads max_threads;
+extern const SettingsUInt64 max_block_size;
+}
+}
+
+namespace local_engine
+{
+
+static DB::ITransformingStep::Traits getTraits()
+{
+    return DB::ITransformingStep::Traits{
+        {
+            .returns_single_stream = true,
+            .preserves_number_of_streams = false,
+            .preserves_sorting = false,
+        },
+        {
+            .preserves_number_of_rows = false,
+        }};
+}
+
+AdvancedExpandStep::AdvancedExpandStep(
+    DB::ContextPtr context_,
+    const DB::Block & input_header_,
+    size_t grouping_keys_,
+    const DB::AggregateDescriptions & aggregate_descriptions_,
+    const ExpandField & project_set_exprs_)
+    : DB::ITransformingStep(input_header_, buildOutputHeader(input_header_, 
project_set_exprs_), getTraits())
+    , context(context_)
+    , grouping_keys(grouping_keys_)
+    , aggregate_descriptions(aggregate_descriptions_)
+    , project_set_exprs(project_set_exprs_)
+{
+}
+
+DB::Block AdvancedExpandStep::buildOutputHeader(const DB::Block &, const 
ExpandField & project_set_exprs_)
+{
+    DB::ColumnsWithTypeAndName cols;
+    const auto & types = project_set_exprs_.getTypes();
+    const auto & names = project_set_exprs_.getNames();
+
+    chassert(names.size() == types.size());
+
+    for (size_t i = 0; i < project_set_exprs_.getExpandCols(); ++i)
+        cols.emplace_back(DB::ColumnWithTypeAndName(types[i], names[i]));
+
+    return DB::Block(std::move(cols));
+}
+
+void AdvancedExpandStep::transformPipeline(DB::QueryPipelineBuilder & 
pipeline, const DB::BuildQueryPipelineSettings & pipeline_settings)
+{
+    const auto & settings = context->getSettingsRef();
+    DB::Names aggregate_grouping_keys;
+    for (size_t i = 0; i < output_header->columns(); ++i)
+    {
+        const auto & col = output_header->getByPosition(i);
+        if (typeid_cast<const DB::ColumnAggregateFunction *>(col.column.get()))
+            break;
+        aggregate_grouping_keys.push_back(col.name);
+    }
+    DB::Aggregator::Params params(
+        aggregate_grouping_keys,
+        aggregate_descriptions,
+        false,
+        settings[DB::Setting::max_rows_to_group_by],
+        settings[DB::Setting::group_by_overflow_mode],
+        settings[DB::Setting::group_by_two_level_threshold],
+        settings[DB::Setting::group_by_two_level_threshold_bytes],
+        settings[DB::Setting::max_bytes_before_external_group_by],
+        settings[DB::Setting::empty_result_for_aggregation_by_empty_set],
+        context->getTempDataOnDisk(),
+        settings[DB::Setting::max_threads],
+        settings[DB::Setting::min_free_disk_space_for_temporary_data],
+        true,
+        3,
+        
PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]),
+        /*enable_prefetch*/ true,
+        /*only_merge*/ false,
+        settings[DB::Setting::optimize_group_by_constant_keys],
+        
settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization],
+        /*StatsCollectingParams*/ {});
+
+    auto input_header = input_headers.front();
+    auto build_transform = [&](DB::OutputPortRawPtrs outputs)
+    {
+        DB::Processors new_processors;
+        for (auto & output : outputs)
+        {
+            auto expand_processor
+                = std::make_shared<AdvancedExpandTransform>(input_header, 
*output_header, grouping_keys, project_set_exprs);
+            DB::connect(*output, expand_processor->getInputs().front());
+            new_processors.push_back(expand_processor);
+
+            auto expand_output_header = 
expand_processor->getOutputs().front().getHeader();
+            
+            auto transform_params = 
std::make_shared<DB::AggregatingTransformParams>(expand_output_header, params, 
false);
+            auto aggregate_processor
+                = 
std::make_shared<GraceAggregatingTransform>(expand_output_header, 
transform_params, context, false, false);
+            DB::connect(expand_processor->getOutputs().back(), 
aggregate_processor->getInputs().front());
+            new_processors.push_back(aggregate_processor);
+            auto aggregate_output_header = 
aggregate_processor->getOutputs().front().getHeader();
+
+            auto resize_processor = 
std::make_shared<DB::ResizeProcessor>(expand_output_header, 2, 1);
+            DB::connect(aggregate_processor->getOutputs().front(), 
resize_processor->getInputs().front());
+            DB::connect(expand_processor->getOutputs().front(), 
resize_processor->getInputs().back());
+            new_processors.push_back(resize_processor);
+        }
+        return new_processors;
+    };
+    pipeline.transform(build_transform);
+}
+
+void AdvancedExpandStep::describePipeline(DB::IQueryPlanStep::FormatSettings & 
settings) const
+{
+    if (!processors.empty())
+        DB::IQueryPlanStep::describePipeline(processors, settings);
+}
+
+void AdvancedExpandStep::updateOutputHeader()
+{
+    output_header = buildOutputHeader(input_headers.front(), 
project_set_exprs);
+}
+
+/// It has two output ports. The 1st output port is for high cardinality data, 
the 2nd output port is for
+/// low cardinality data.
+AdvancedExpandTransform::AdvancedExpandTransform(
+    const DB::Block & input_header_, const DB::Block & output_header_, size_t 
grouping_keys_, const ExpandField & project_set_exprs_)
+    : DB::IProcessor({input_header_}, {output_header_, output_header_})
+    , grouping_keys(grouping_keys_)
+    , project_set_exprs(project_set_exprs_)
+    , input_header(input_header_)
+{
+    for (size_t i = 0; i < project_set_exprs.getKinds().size(); ++i)
+    {
+        is_low_cardinality_expand.push_back(true);
+    }
+
+    for (auto & port : outputs)
+    {
+        output_ports.push_back(&port);
+    }
+}
+
+DB::IProcessor::Status AdvancedExpandTransform::prepare()
+{
+    auto & input = inputs.front();
+
+    if (isCancelled() || output_ports[0]->isFinished() || 
output_ports[1]->isFinished())
+    {
+        input.close();
+        output_ports[0]->finish();
+        output_ports[1]->finish();
+        return Status::Finished;
+    }
+
+    if (has_output)
+    {
+        auto & output_port = 
*output_ports[is_low_cardinality_expand[expand_expr_iterator - 1]];
+        if (output_port.canPush())
+        {
+            output_port.push(std::move(output_chunk));
+            has_output = false;
+            auto status = expand_expr_iterator >= 
project_set_exprs.getExpandRows() ? Status::NeedData : Status::Ready;
+            return status;
+        }
+        else
+        {
+            return Status::PortFull;
+        }
+    }
+
+    if (!has_input)
+    {
+        if (input.isFinished())
+        {
+            if (!cardinality_detect_blocks.empty())
+            {
+                input_finished = true;
+                return Status::Ready;
+            }
+            else
+            {
+                output_ports[0]->finish();
+                output_ports[1]->finish();
+                return Status::Finished;
+            }
+        }
+
+        input.setNeeded();
+        if (!input.hasData())
+        {
+            return Status::NeedData;
+        }
+        input_chunk = input.pull(true);
+        has_input = true;
+        expand_expr_iterator = 0;
+    }
+
+    return Status::Ready;
+}
+
+void AdvancedExpandTransform::work()
+{
+    if (!input_finished && cardinality_detect_rows < 
rows_for_detect_cardinality)
+    {
+        
cardinality_detect_blocks.push_back(input_header.cloneWithColumns(input_chunk.detachColumns()));
+        cardinality_detect_rows += cardinality_detect_blocks.back().rows();
+        has_input = false;
+    }
+    if ((input_finished || cardinality_detect_rows >= 
rows_for_detect_cardinality) && !cardinality_detect_blocks.empty())
+    {
+        detectCardinality();
+    }
+    else if (!input_finished && cardinality_detect_rows < 
rows_for_detect_cardinality)
+        return;
+
+    /// The phase of detecting grouping keys' cardinality is finished here.
+    expandInputChunk();
+}
+
+void AdvancedExpandTransform::detectCardinality()
+{
+    DB::Block block = 
BlockUtil::concatenateBlocksMemoryEfficiently(std::move(cardinality_detect_blocks));
+    std::vector<bool> is_col_low_cardinality;
+    for (size_t i = 0; i < grouping_keys; ++i)
+    {
+        DB::WeakHash32 hash(cardinality_detect_rows);
+        std::unordered_set<UInt32> distinct_ids;
+        const auto & data = hash.getData();
+        for (size_t j = 0; j < cardinality_detect_rows; ++j)
+            distinct_ids.insert(data[j]);
+        size_t distinct_ids_cnt = distinct_ids.size();
+        is_col_low_cardinality.push_back(distinct_ids.size() < 1000);
+    }
+
+    for (size_t i = 0; i < project_set_exprs.getExpandRows(); ++i)
+    {
+        const auto & kinds = project_set_exprs.getKinds()[i];
+        for (size_t k = 0; k < grouping_keys; ++k)
+        {
+            const auto & kind = kinds[k];
+            if (kind == EXPAND_FIELD_KIND_SELECTION && 
!is_col_low_cardinality[k])
+            {
+                is_low_cardinality_expand[i] = false;
+                break;
+            }
+        }
+    }
+    LOG_DEBUG(getLogger("AdvancedExpandTransform"), "Low cardinality expand: 
{}", fmt::join(is_low_cardinality_expand, ","));
+
+    input_chunk = DB::Chunk(block.getColumns(), block.rows());
+    cardinality_detect_blocks.clear();
+}
+
+void AdvancedExpandTransform::expandInputChunk()
+{
+    const auto & input_columns = input_chunk.getColumns();
+    const auto & types = project_set_exprs.getTypes();
+    const auto & kinds = project_set_exprs.getKinds()[expand_expr_iterator];
+    const auto & fields = project_set_exprs.getFields()[expand_expr_iterator];
+    size_t rows = input_chunk.getNumRows();
+
+    DB::Columns columns(types.size());
+    for (size_t col_i = 0; col_i < types.size(); ++col_i)
+    {
+        const auto & type = types[col_i];
+        const auto & kind = kinds[col_i];
+        const auto & field = fields[col_i];
+
+        if (kind == EXPAND_FIELD_KIND_SELECTION)
+        {
+            auto index = field.safeGet<Int32>();
+            const auto & input_column = input_columns[index];
+
+            DB::ColumnWithTypeAndName input_arg;
+            input_arg.column = input_column;
+            input_arg.type = input_header.getByPosition(index).type;
+            /// input_column maybe non-Nullable
+            columns[col_i] = DB::castColumn(input_arg, type);
+        }
+        else if (kind == EXPAND_FIELD_KIND_LITERAL)
+        {
+            /// Add const column with field value
+            auto column = type->createColumnConst(rows, 
field)->convertToFullColumnIfConst();
+            columns[col_i] = std::move(column);
+        }
+        else
+            throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown 
ExpandFieldKind {}", magic_enum::enum_name(kind));
+    }
+
+    output_chunk = DB::Chunk(std::move(columns), rows);
+    has_output = true;
+
+    ++expand_expr_iterator;
+    has_input = expand_expr_iterator < project_set_exprs.getExpandRows();
+}
+}
diff --git a/cpp-ch/local-engine/Operator/AdvancedExpandStep.h 
b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
new file mode 100644
index 0000000000..2950846585
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/AdvancedExpandStep.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <Core/Block.h>
+#include <Core/Names.h>
+#include <Interpreters/AggregateDescription.h>
+#include <Parser/ExpandField.h>
+#include <Processors/Chunk.h>
+#include <Processors/IProcessor.h>
+#include <Processors/Port.h>
+#include <Processors/QueryPlan/IQueryPlanStep.h>
+#include <Processors/QueryPlan/ITransformingStep.h>
+
+namespace local_engine
+{
+// This step is used when we move the expand operator after the partial 
aggregator.
+// To avoid increasing the overhead of shuffle when some of the grouping keys 
are high cardinality, we add an extra aggregate operator after
+// this expand operator and aggregate the low cardinality grouping keys.
+class AdvancedExpandStep : public DB::ITransformingStep
+{
+public:
+    explicit AdvancedExpandStep(
+        DB::ContextPtr context_,
+        const DB::Block & input_header_,
+        size_t grouping_keys_,
+        const DB::AggregateDescriptions & aggregate_descriptions_,
+        const ExpandField & project_set_exprs_);
+    ~AdvancedExpandStep() override = default;
+
+    String getName() const override { return "AdvancedExpandStep"; }
+
+    void transformPipeline(DB::QueryPipelineBuilder & pipeline, const 
DB::BuildQueryPipelineSettings & settings) override;
+    void describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const 
override;
+
+    static DB::Block buildOutputHeader(const DB::Block & header, const 
ExpandField & project_set_exprs_);
+
+protected:
+    DB::ContextPtr context;
+    size_t grouping_keys;
+    DB::AggregateDescriptions aggregate_descriptions;
+    ExpandField project_set_exprs;
+
+    void updateOutputHeader() override;
+};
+
+class AdvancedExpandTransform : public DB::IProcessor
+{
+public:
+    using Status = DB::IProcessor::Status;
+    /// Need to ensure that the input header is [grouping keys] ++ 
[aggregation columns]
+    explicit AdvancedExpandTransform(
+        const DB::Block & inpput_header_, const DB::Block & output_header_, 
size_t goruping_keys_, const ExpandField & project_set_exprs_);
+    ~AdvancedExpandTransform() override = default;
+
+    Status prepare() override;
+    void work() override;
+    String getName() const override { return "AdvancedExpandTransform"; }
+
+private:
+    size_t grouping_keys = 0;
+    ExpandField project_set_exprs;
+    DB::Block input_header;
+    bool has_input = false;
+    bool has_output = false;
+    size_t expand_expr_iterator = 0;
+    std::vector<bool> is_low_cardinality_expand;
+    std::vector<size_t> approximate_grouping_keys;
+    size_t cardinality_detect_rows = 0;
+    std::vector<DB::Block> cardinality_detect_blocks;
+    static constexpr size_t rows_for_detect_cardinality = 10000;
+    bool input_finished = false;
+
+    std::vector<DB::OutputPort *> output_ports;
+
+    DB::Chunk input_chunk;
+    DB::Chunk output_chunk;
+
+    void detectCardinality();
+    void expandInputChunk();
+};
+
+}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
index 269d55e645..6bc8c7e6e1 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/AggregateRelParser.cpp
@@ -525,8 +525,7 @@ void AggregateRelParser::addAggregatingStep()
             /// We cannot use streaming aggregating strategy in step3. 
Otherwise it will generate multiple blocks with same n_name in them. This
             /// will make the result for count(distinct(n_name)) wrong. step3 
must finish all inputs before it puts any block into step4.
             /// So we introduce GraceAggregatingStep here, it can handle mass 
data with high cardinality.
-            auto aggregating_step
-                = std::make_unique<GraceAggregatingStep>(getContext(), 
plan->getCurrentHeader(), params, has_first_stage);
+            auto aggregating_step = 
std::make_unique<GraceAggregatingStep>(getContext(), plan->getCurrentHeader(), 
params, has_first_stage);
             steps.emplace_back(aggregating_step.get());
             plan->addStep(std::move(aggregating_step));
         }
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
index 8a64c445e7..f3d8b4ab11 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.cpp
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 #include "ExpandRelParser.h"
+#include <ratio>
 #include <vector>
 #include <Core/Block.h>
 #include <Core/ColumnWithTypeAndName.h>
+#include <Operator/AdvancedExpandStep.h>
 #include <Operator/ExpandStep.h>
-#include <Parser/ExpandField.h>
 #include <Parser/RelParsers/RelParser.h>
 #include <Processors/QueryPlan/QueryPlan.h>
 #include <Common/logger_useful.h>
@@ -45,11 +46,17 @@ void updateType(DB::DataTypePtr & type, const 
DB::DataTypePtr & new_type)
     }
 }
 
-DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> &)
+DB::QueryPlanPtr
+ExpandRelParser::parse(DB::QueryPlanPtr query_plan, const substrait::Rel & 
rel, std::list<const substrait::Rel *> & rel_stack)
 {
-    const auto & expand_rel = rel.expand();
-    const auto & header = query_plan->getCurrentHeader();
+    if (!isLazyAggregateExpand(rel.expand()))
+        return normalParse(std::move(query_plan), rel, rel_stack);
+    else
+        return lazyAggregateExpandParse(std::move(query_plan), rel, rel_stack);
+}
 
+ExpandField ExpandRelParser::buildExpandField(const DB::Block & header, const 
substrait::ExpandRel & expand_rel)
+{
     std::vector<std::vector<ExpandFieldKind>> expand_kinds;
     std::vector<std::vector<DB::Field>> expand_fields;
     std::vector<DB::DataTypePtr> types;
@@ -82,6 +89,10 @@ DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr 
query_plan, const subst
                 auto field = 
project_expr.selection().direct_reference().struct_field().field();
                 kinds.push_back(ExpandFieldKind::EXPAND_FIELD_KIND_SELECTION);
                 fields.push_back(field);
+                if (field >= header.columns())
+                {
+                    throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Field 
index out of range: {}, header: {}", field, header.dumpStructure());
+                }
                 updateType(types[i], header.getByPosition(field).type);
                 const auto & name = header.getByPosition(field).name;
                 if (names[i].empty())
@@ -123,6 +134,28 @@ DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr 
query_plan, const subst
     }
 
     ExpandField expand_field(names, types, expand_kinds, expand_fields);
+    return expand_field;
+}
+
+bool ExpandRelParser::isLazyAggregateExpand(const substrait::ExpandRel & 
expand_rel)
+{
+    const auto & input_rel = expand_rel.input();
+    if (input_rel.rel_type_case() != substrait::Rel::RelTypeCase::kAggregate)
+        return false;
+    const auto & aggregate_rel = input_rel.aggregate();
+    for (const auto & measure : aggregate_rel.measures())
+    {
+        if (measure.measure().phase() != 
substrait::AggregationPhase::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE)
+            return false;
+    }
+    return true;
+}
+
+DB::QueryPlanPtr ExpandRelParser::normalParse(DB::QueryPlanPtr query_plan, 
const substrait::Rel & rel, std::list<const substrait::Rel *> &)
+{
+    const auto & expand_rel = rel.expand();
+    const auto & header = query_plan->getCurrentHeader();
+    auto expand_field = buildExpandField(header, expand_rel);
     auto expand_step = 
std::make_unique<ExpandStep>(query_plan->getCurrentHeader(), 
std::move(expand_field));
     expand_step->setStepDescription("Expand Step");
     steps.emplace_back(expand_step.get());
@@ -130,6 +163,62 @@ DB::QueryPlanPtr ExpandRelParser::parse(DB::QueryPlanPtr 
query_plan, const subst
     return query_plan;
 }
 
+DB::QueryPlanPtr ExpandRelParser::lazyAggregateExpandParse(
+    DB::QueryPlanPtr query_plan, const substrait::Rel & rel, std::list<const 
substrait::Rel *> & rel_stack)
+{
+    DB::Block input_header = query_plan->getCurrentHeader();
+    const auto & expand_rel = rel.expand();
+    auto expand_field = buildExpandField(input_header, expand_rel);
+    auto aggregate_rel = rel.expand().input().aggregate();
+    auto aggregate_descriptions = buildAggregations(input_header, 
expand_field, aggregate_rel);
+
+    size_t  grouping_keys = 
aggregate_rel.groupings(0).grouping_expressions_size();
+
+    auto expand_step
+        = std::make_unique<AdvancedExpandStep>(getContext(), input_header, 
grouping_keys, aggregate_descriptions, expand_field);
+    expand_step->setStepDescription("Advanced Expand Step");
+    steps.emplace_back(expand_step.get());
+    query_plan->addStep(std::move(expand_step));
+    return query_plan;
+}
+
+DB::AggregateDescriptions ExpandRelParser::buildAggregations(
+    const DB::Block & input_header, const ExpandField & expand_field, const 
substrait::AggregateRel & aggregate_rel)
+{
+    auto header = AdvancedExpandStep::buildOutputHeader(input_header, 
expand_field);
+    DB::AggregateDescriptions descriptions;
+    DB::ColumnsWithTypeAndName aggregate_columns;
+    for (const auto & col : header.getColumnsWithTypeAndName())
+    {
+        if (typeid_cast<const DB::ColumnAggregateFunction *>(col.column.get()))
+            aggregate_columns.push_back(col);
+    }
+
+    for (size_t i = 0; i < aggregate_rel.measures_size(); ++i)
+    {
+        /// The output header of the aggregate is [grouping keys] ++ 
[aggregation columns]
+        const auto & measure = aggregate_rel.measures(i);
+        const auto & col = aggregate_columns[i];
+        DB::AggregateDescription description;
+        auto aggregate_col = typeid_cast<const DB::ColumnAggregateFunction 
*>(col.column.get());
+
+        description.column_name = col.name;
+        description.argument_names = {col.name};
+
+        auto aggregate_function = aggregate_col->getAggregateFunction();
+        description.parameters = aggregate_function->getParameters();
+
+        // Need apply "PartialMerge" combinator for the aggregate function.
+        auto function_name_with_combinator = aggregate_function->getName() + 
"PartialMerge";
+        DB::AggregateFunctionProperties aggregate_function_properties;
+        description.function
+            = getAggregateFunction(function_name_with_combinator, {col.type}, 
aggregate_function_properties, description.parameters);
+
+        descriptions.emplace_back(description);
+    }
+    return descriptions;
+}
+
 void registerExpandRelParser(RelParserFactory & factory)
 {
     auto builder = [](ParserContextPtr parser_context) { return 
std::make_shared<ExpandRelParser>(parser_context); };
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
index 7ba985a3f6..4a178ab08d 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/ExpandRelParser.h
@@ -16,9 +16,9 @@
  */
 #pragma once
 #include <optional>
+#include <Parser/ExpandField.h>
 #include <Parser/RelParsers/RelParser.h>
 
-
 namespace local_engine
 {
 class SerializedPlanParser;
@@ -29,7 +29,17 @@ public:
     ~ExpandRelParser() override = default;
     DB::QueryPlanPtr
     parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
+    DB::QueryPlanPtr normalParse(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_);
+    DB::QueryPlanPtr
+    lazyAggregateExpandParse(DB::QueryPlanPtr query_plan, const substrait::Rel 
& rel, std::list<const substrait::Rel *> & rel_stack_);
 
     std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.expand().input(); }
+
+private:
+    bool isLazyAggregateExpand(const substrait::ExpandRel & expand_rel);
+    ExpandField buildExpandField(const DB::Block & header, const 
substrait::ExpandRel & expand_rel);
+
+    DB::AggregateDescriptions
+    buildAggregations(const DB::Block & input_header, const ExpandField & 
expand_field, const substrait::AggregateRel & aggregate_rel);
 };
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to