This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 883d02641c [GLUTEN-7905][CH] Implete window's `topk` by aggregation 
(#7976)
883d02641c is described below

commit 883d02641c82631491325f38e9484347b939a6a3
Author: lgbo <[email protected]>
AuthorDate: Thu Nov 28 11:50:12 2024 +0800

    [GLUTEN-7905][CH] Implete window's `topk` by aggregation (#7976)
    
    What changes were proposed in this pull request?
    (Please fill in changes proposed in this fix)
    
    Fixes: #7905
    
    This PR will use aggregation to calculate window's topk automatically when 
the partition keys are low cardinality ones.
    
    How was this patch tested?
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    
    unit tests
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   8 +
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   1 +
 .../CHAggregateGroupLimitExecTransformer.scala     | 174 +++++++
 .../extension/ConvertWindowToAggregate.scala       | 138 ++++++
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala | 151 ++++++
 .../AggregateFunctions/GroupLimitFunctions.cpp     | 303 ++++++++++++
 cpp-ch/local-engine/Common/AggregateUtil.cpp       |   1 -
 cpp-ch/local-engine/Common/ArrayJoinHelper.cpp     | 175 +++++++
 cpp-ch/local-engine/Common/ArrayJoinHelper.h       |  39 ++
 cpp-ch/local-engine/Common/CHUtil.cpp              |   2 +
 cpp-ch/local-engine/Common/GlutenConfig.cpp        |  11 +-
 cpp-ch/local-engine/Common/GlutenConfig.h          |  22 +-
 cpp-ch/local-engine/Common/SortUtils.cpp           | 105 +++++
 cpp-ch/local-engine/Common/SortUtils.h             |  33 ++
 cpp-ch/local-engine/Operator/BranchStep.cpp        | 265 +++++++++++
 cpp-ch/local-engine/Operator/BranchStep.h          |  86 ++++
 cpp-ch/local-engine/Operator/BranchTransform.cpp   | 155 +++++++
 cpp-ch/local-engine/Operator/BranchTransform.h     |  56 +++
 .../local-engine/Operator/WindowGroupLimitStep.cpp |  23 +-
 .../Parser/AdvancedParametersParseUtil.cpp         |   1 +
 .../Parser/AdvancedParametersParseUtil.h           |   1 +
 .../Parser/RelParsers/GroupLimitRelParser.cpp      | 512 +++++++++++++++++++++
 .../Parser/RelParsers/GroupLimitRelParser.h        |  88 ++++
 .../Parser/RelParsers/ProjectRelParser.cpp         |  92 +---
 .../Parser/RelParsers/ProjectRelParser.h           |  12 -
 .../RelParsers/WindowGroupLimitRelParser.cpp       |  92 ----
 .../Parser/RelParsers/WindowGroupLimitRelParser.h  |  46 --
 27 files changed, 2328 insertions(+), 264 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index f6cacba42b..c6c8acf705 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -359,6 +359,14 @@ object CHBackendSettings extends BackendSettingsApi with 
Logging {
     )
   }
 
+  // If the partition keys are high cardinality, the aggregation method is 
slower.
+  def enableConvertWindowGroupLimitToAggregate(): Boolean = {
+    SparkEnv.get.conf.getBoolean(
+      CHConf.runtimeConfig("enable_window_group_limit_to_aggregate"),
+      defaultValue = true
+    )
+  }
+
   override def enableNativeWriteFiles(): Boolean = {
     GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 02ec91496e..edf7a48025 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -110,6 +110,7 @@ object CHRuleApi {
     injector.injectPostTransform(c => 
RewriteSortMergeJoinToHashJoinRule.apply(c.session))
     injector.injectPostTransform(c => 
PushdownAggregatePreProjectionAheadExpand.apply(c.session))
     injector.injectPostTransform(c => LazyAggregateExpandRule.apply(c.session))
+    injector.injectPostTransform(c => 
ConverRowNumbertWindowToAggregateRule(c.session))
     injector.injectPostTransform(
       c =>
         intercept(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHAggregateGroupLimitExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHAggregateGroupLimitExecTransformer.scala
new file mode 100644
index 0000000000..83bb33bfa2
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHAggregateGroupLimitExecTransformer.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.exception.GlutenNotSupportException
+import org.apache.gluten.expression._
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.SparkPlan
+
+import com.google.protobuf.StringValue
+import io.substrait.proto.SortField
+
+import scala.collection.JavaConverters._
+
+case class CHAggregateGroupLimitExecTransformer(
+    partitionSpec: Seq[Expression],
+    orderSpec: Seq[SortOrder],
+    rankLikeFunction: Expression,
+    resultAttributes: Seq[Attribute],
+    limit: Int,
+    child: SparkPlan)
+  extends UnaryTransformSupport {
+
+  @transient override lazy val metrics =
+    
BackendsApiManager.getMetricsApiInstance.genWindowTransformerMetrics(sparkContext)
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+
+  override def metricsUpdater(): MetricsUpdater =
+    
BackendsApiManager.getMetricsApiInstance.genWindowTransformerMetricsUpdater(metrics)
+
+  override def output: Seq[Attribute] = resultAttributes
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (partitionSpec.isEmpty) {
+      // Only show warning when the number of bytes is larger than 100 MiB?
+      logWarning(
+        "No Partition Defined for Window operation! Moving all data to a 
single "
+          + "partition, this can cause serious performance degradation.")
+      AllTuples :: Nil
+    } else ClusteredDistribution(partitionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    Seq(Nil)
+  }
+
+  override def outputOrdering: Seq[SortOrder] = {
+    if (requiredChildOrdering.forall(_.isEmpty)) {
+      Nil
+    } else {
+      child.outputOrdering
+    }
+  }
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  def getWindowGroupLimitRel(
+      context: SubstraitContext,
+      originalInputAttributes: Seq[Attribute],
+      operatorId: Long,
+      input: RelNode,
+      validation: Boolean): RelNode = {
+    val args = context.registeredFunction
+    // Partition By Expressions
+    val partitionsExpressions = partitionSpec
+      .map(
+        ExpressionConverter
+          .replaceWithExpressionTransformer(_, attributeSeq = child.output)
+          .doTransform(args))
+      .asJava
+
+    // Sort By Expressions
+    val sortFieldList =
+      orderSpec.map {
+        order =>
+          val builder = SortField.newBuilder()
+          val exprNode = ExpressionConverter
+            .replaceWithExpressionTransformer(order.child, attributeSeq = 
child.output)
+            .doTransform(args)
+          builder.setExpr(exprNode.toProtobuf)
+          
builder.setDirectionValue(SortExecTransformer.transformSortDirection(order))
+          builder.build()
+      }.asJava
+    if (!validation) {
+      val windowFunction = rankLikeFunction match {
+        case _: RowNumber => ExpressionNames.ROW_NUMBER
+        case _: Rank => ExpressionNames.RANK
+        case _: DenseRank => ExpressionNames.DENSE_RANK
+        case _ => throw new GlutenNotSupportException(s"Unknow window function 
$rankLikeFunction")
+      }
+      val parametersStr = new StringBuffer("WindowGroupLimitParameters:")
+      parametersStr
+        .append("window_function=")
+        .append(windowFunction)
+        .append("\n")
+        .append("is_aggregate_group_limit=true\n")
+      val message = 
StringValue.newBuilder().setValue(parametersStr.toString).build()
+      val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+        BackendsApiManager.getTransformerApiInstance.packPBMessage(message),
+        null)
+      RelBuilder.makeWindowGroupLimitRel(
+        input,
+        partitionsExpressions,
+        sortFieldList,
+        limit,
+        extensionNode,
+        context,
+        operatorId)
+    } else {
+      // Use a extension node to send the input types through Substrait plan 
for validation.
+      val inputTypeNodeList = originalInputAttributes
+        .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+        .asJava
+      val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+        BackendsApiManager.getTransformerApiInstance.packPBMessage(
+          TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+
+      RelBuilder.makeWindowGroupLimitRel(
+        input,
+        partitionsExpressions,
+        sortFieldList,
+        limit,
+        extensionNode,
+        context,
+        operatorId)
+    }
+  }
+
+  override protected def doValidateInternal(): ValidationResult = {
+    val substraitContext = new SubstraitContext
+    val operatorId = substraitContext.nextOperatorId(this.nodeName)
+
+    val relNode =
+      getWindowGroupLimitRel(substraitContext, child.output, operatorId, null, 
validation = true)
+
+    doNativeValidation(substraitContext, relNode)
+  }
+
+  override protected def doTransform(context: SubstraitContext): 
TransformContext = {
+    val childCtx = child.asInstanceOf[TransformSupport].transform(context)
+    val operatorId = context.nextOperatorId(this.nodeName)
+
+    val currRel =
+      getWindowGroupLimitRel(context, child.output, operatorId, childCtx.root, 
validation = false)
+    assert(currRel != null, "Window Group Limit Rel should be valid")
+    TransformContext(output, currRel)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ConvertWindowToAggregate.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ConvertWindowToAggregate.scala
new file mode 100644
index 0000000000..ad2b22ba6c
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ConvertWindowToAggregate.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension
+
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.execution._
+import org.apache.gluten.expression.WindowFunctionsBuilder
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.rules.Rule
+// import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types._
+
+// When to find the first rows of partitions by window function, we can 
convert it to aggregate
+// function.
+case class ConverRowNumbertWindowToAggregateRule(spark: SparkSession)
+  extends Rule[SparkPlan]
+  with Logging {
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!CHBackendSettings.enableConvertWindowGroupLimitToAggregate) {
+      return plan
+    }
+    plan.transformUp {
+      case filter @ FilterExecTransformer(
+            condition,
+            window @ WindowExecTransformer(
+              windowExpressions,
+              partitionSpec,
+              orderSpec,
+              sort @ SortExecTransformer(_, _, _, _))) =>
+        if (
+          !isSupportedWindowFunction(windowExpressions) || !isTopKLimitFilter(
+            condition,
+            windowExpressions(0))
+        ) {
+          logDebug(
+            s"xxx Not Supported case for converting window to aggregate. is 
topk limit: " +
+              s"${isTopKLimitFilter(condition, windowExpressions(0))}. is 
supported window " +
+              s"function: ${isSupportedWindowFunction(windowExpressions)}")
+          filter
+        } else {
+          val limit = getLimit(condition.asInstanceOf[BinaryComparison])
+          if (limit < 1 || limit > 100) {
+            filter
+          } else {
+            val groupLimit = CHAggregateGroupLimitExecTransformer(
+              partitionSpec,
+              orderSpec,
+              extractWindowFunction(windowExpressions(0)),
+              sort.child.output ++ Seq(windowExpressions(0).toAttribute),
+              limit,
+              sort.child
+            )
+            groupLimit
+          }
+        }
+    }
+  }
+
+  def getLimit(e: BinaryComparison): Int = {
+    e match {
+      case _: EqualTo => evalIntLiteral(e.right).get
+      case _: LessThanOrEqual => evalIntLiteral(e.right).get
+      case _: LessThan => evalIntLiteral(e.right).get - 1
+      case _ => throw new GlutenException(s"Unsupported comparison: $e")
+    }
+  }
+
+  def evalIntLiteral(expr: Expression): Option[Int] = {
+    expr match {
+      case int @ Literal(value, IntegerType) => Some(value.asInstanceOf[Int])
+      case long @ Literal(value, LongType) => 
Some(value.asInstanceOf[Long].toInt)
+      case _ => None
+    }
+  }
+
+  def extractWindowFunction(windowExpression: NamedExpression): Expression = {
+    val aliasExpr = windowExpression.asInstanceOf[Alias]
+    
WindowFunctionsBuilder.extractWindowExpression(aliasExpr.child).windowFunction
+  }
+
+  def isSupportedWindowFunction(windowExpressions: Seq[NamedExpression]): 
Boolean = {
+    if (windowExpressions.length != 1) {
+      return false
+    }
+    val windowFunction = extractWindowFunction(windowExpressions(0))
+    windowFunction match {
+      case _: RowNumber => true
+      case _ => false
+    }
+  }
+
+  // If the filter condition is a constant limit condition, return the limit 
value.
+  def isTopKLimitFilter(condition: Expression, windowExpression: 
NamedExpression): Boolean = {
+    def isWindowFunctionResult(
+        condition: Expression,
+        windowExpression: NamedExpression): Boolean = {
+      condition match {
+        case attr: Attribute =>
+          attr.semanticEquals(windowExpression.toAttribute)
+        case _ => false
+      }
+    }
+    if (!condition.isInstanceOf[BinaryComparison]) {
+      return false
+    }
+    val binaryComparison = condition.asInstanceOf[BinaryComparison]
+    val constLimit = evalIntLiteral(binaryComparison.right)
+    if (!isWindowFunctionResult(binaryComparison.left, windowExpression) || 
!constLimit.isDefined) {
+      return false
+    }
+    binaryComparison match {
+      case _: EqualTo => constLimit.get == 1
+      case _: LessThanOrEqual | _: LessThan => true
+      case _ => false
+    }
+  }
+
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index fbefc054fa..f82acdc415 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -41,6 +41,7 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
   override protected val tpchQueries: String =
     rootPath + 
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries"
   override protected val queriesResults: String = rootPath + "queries-output"
+  val runtimeConfigPrefix = 
"spark.gluten.sql.columnar.backend.ch.runtime_config."
 
   override protected def sparkConf: SparkConf = {
     super.sparkConf
@@ -3175,6 +3176,156 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
     spark.sql("drop table if exists test_7647")
   }
 
+  test("GLUTEN-7905 get topk of window by aggregate") {
+    withSQLConf(
+      (runtimeConfigPrefix + "enable_window_group_limit_to_aggregate", "true"),
+      (runtimeConfigPrefix + 
"window.aggregate_topk_high_cardinality_threshold", "2.0")) {
+      def checkWindowGroupLimit(df: DataFrame): Unit = {
+        val expands = collectWithSubqueries(df.queryExecution.executedPlan) {
+          case e: CHAggregateGroupLimitExecTransformer => e
+          case wgl: CHWindowGroupLimitExecTransformer => wgl
+        }
+        assert(expands.size >= 1)
+      }
+      spark.sql("create table test_win_top (a string, b int, c int) using 
parquet")
+      spark.sql("""
+                  |insert into test_win_top values
+                  |('a', 3, 3), ('a', 1, 5), ('a', 2, 2), ('a', null, null), 
('a', null, 1),
+                  |('b', 1, 1), ('b', 2, 1),
+                  |('c', 2, 3)
+                  |""".stripMargin)
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c,
+          |row_number() over (partition by a order by b desc nulls first, c 
nulls last) as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c, row_number() over (partition by a order by b desc, 
c nulls last) as r
+          |from test_win_top
+          |)where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c, row_number() over (partition by a order by b asc 
nulls first, c) as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c, row_number() over (partition by a order by b asc 
nulls last) as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c, row_number() over (partition by a order by b , c) 
as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      spark.sql("drop table if exists test_win_top")
+    }
+
+  }
+
+  test("GLUTEN-7905 get topk of window by window") {
+    withSQLConf(
+      (runtimeConfigPrefix + "enable_window_group_limit_to_aggregate", "true"),
+      (runtimeConfigPrefix + 
"window.aggregate_topk_high_cardinality_threshold", "0.0")) {
+      def checkWindowGroupLimit(df: DataFrame): Unit = {
+        // for spark 3.5, CHWindowGroupLimitExecTransformer is in used
+        val expands = collectWithSubqueries(df.queryExecution.executedPlan) {
+          case e: CHAggregateGroupLimitExecTransformer => e
+          case wgl: CHWindowGroupLimitExecTransformer => wgl
+        }
+        assert(expands.size >= 1)
+      }
+      spark.sql("drop table if exists test_win_top")
+      spark.sql("create table test_win_top (a string, b int, c int) using 
parquet")
+      spark.sql("""
+                  |insert into test_win_top values
+                  |('a', 3, 3), ('a', 1, 5), ('a', 2, 2), ('a', null, null), 
('a', null, 1),
+                  |('b', 1, 1), ('b', 2, 1),
+                  |('c', 2, 3)
+                  |""".stripMargin)
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c,
+          |row_number() over (partition by a order by b desc nulls first, c 
nulls last) as r
+          |from test_win_top
+          |)where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c, row_number() over (partition by a order by b desc, 
c nulls last) as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          | select * from(
+          |select a, b, c, row_number() over (partition by a order by b asc 
nulls first, c) as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c, row_number() over (partition by a order by b asc 
nulls last) as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      compareResultsAgainstVanillaSpark(
+        """
+          |select * from(
+          |select a, b, c, row_number() over (partition by a order by b , c) 
as r
+          |from test_win_top)
+          |where r <= 1
+          |""".stripMargin,
+        true,
+        checkWindowGroupLimit
+      )
+      spark.sql("drop table if exists test_win_top")
+    }
+
+  }
+
   test("GLUTEN-7759: Fix bug of agg pre-project push down") {
     val table_create_sql =
       "create table test_tbl_7759(id bigint, name string, day string) using 
parquet"
diff --git a/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp 
b/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp
new file mode 100644
index 0000000000..137ae8a544
--- /dev/null
+++ b/cpp-ch/local-engine/AggregateFunctions/GroupLimitFunctions.cpp
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <new>
+#include <vector>
+#include <AggregateFunctions/AggregateFunctionFactory.h>
+#include <AggregateFunctions/FactoryHelpers.h>
+#include <AggregateFunctions/Helpers.h>
+#include <AggregateFunctions/IAggregateFunction.h>
+#include <AggregateFunctions/IAggregateFunction_fwd.h>
+#include <Columns/ColumnArray.h>
+#include <Core/Field.h>
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeArray.h>
+#include <DataTypes/DataTypeTuple.h>
+#include <DataTypes/DataTypesNumber.h>
+#include <DataTypes/IDataType.h>
+#include <DataTypes/Serializations/ISerialization.h>
+#include <Interpreters/Context.h>
+#include <Parsers/ASTExpressionList.h>
+#include <Parsers/ASTIdentifier.h>
+#include <Parsers/ASTOrderByElement.h>
+#include <Parsers/ExpressionElementParsers.h>
+#include <Parsers/ExpressionListParsers.h>
+#include <Parsers/parseQuery.h>
+#include <Parsers/queryToString.h>
+#include <Common/Exception.h>
+
+#include <Poco/Logger.h>
+#include <Common/logger_useful.h>
+#include "base/defines.h"
+
+namespace DB::ErrorCodes
+{
+extern const int LOGICAL_ERROR;
+extern const int BAD_ARGUMENTS;
+}
+
+namespace local_engine
+{
+
+struct SortOrderField
+{
+    size_t pos = 0;
+    Int8 direction = 0;
+    Int8 nulls_direction = 0;
+};
+using SortOrderFields = std::vector<SortOrderField>;
+
+struct RowNumGroupArraySortedData
+{
+public:
+    using Data = DB::Tuple;
+    std::vector<Data> values;
+
+    static bool compare(const Data & lhs, const Data & rhs, const 
SortOrderFields & sort_orders)
+    {
+        for (const auto & sort_order : sort_orders)
+        {
+            const auto & pos = sort_order.pos;
+            const auto & asc = sort_order.direction;
+            const auto & nulls_first = sort_order.nulls_direction;
+            bool l_is_null = lhs[pos].isNull();
+            bool r_is_null = rhs[pos].isNull();
+            if (l_is_null && r_is_null)
+                continue;
+            else if (l_is_null)
+                return nulls_first;
+            else if (r_is_null)
+                return !nulls_first;
+            else if (lhs[pos] < rhs[pos])
+                return asc;
+            else if (lhs[pos] > rhs[pos])
+                return !asc;
+        }
+        return false;
+    }
+
+    ALWAYS_INLINE void heapReplaceTop(const SortOrderFields & sort_orders)
+    {
+        size_t size = values.size();
+        if (size < 2)
+            return;
+        size_t child_index = 1;
+        if (size > 2 && compare(values[1], values[2], sort_orders))
+            ++child_index;
+
+        if (compare(values[child_index], values[0], sort_orders))
+            return;
+
+        size_t current_index = 0;
+        auto current = values[current_index];
+        do
+        {
+            values[current_index] = values[child_index];
+            current_index = child_index;
+
+            child_index = 2 * child_index + 1;
+
+            if (child_index >= size)
+                break;
+
+            if ((child_index + 1) < size && compare(values[child_index], 
values[child_index + 1], sort_orders))
+                ++child_index;
+        } while (!compare(values[child_index], current, sort_orders));
+
+        values[current_index] = current;
+    }
+
+    ALWAYS_INLINE void addElement(const Data && data, const SortOrderFields & 
sort_orders, size_t max_elements)
+    {
+        if (values.size() >= max_elements)
+        {
+            if (!compare(data, values[0], sort_orders))
+                return;
+            values[0] = data;
+            heapReplaceTop(sort_orders);
+            return;
+        }
+        values.emplace_back(std::move(data));
+        auto cmp = [&sort_orders](const Data & a, const Data & b) { return 
compare(a, b, sort_orders); };
+        std::push_heap(values.begin(), values.end(), cmp);
+    }
+
+    ALWAYS_INLINE void sortAndLimit(size_t max_elements, const SortOrderFields 
& sort_orders)
+    {
+        ::sort(values.begin(), values.end(), [&sort_orders](const Data & a, 
const Data & b) { return compare(a, b, sort_orders); });
+        if (values.size() > max_elements)
+            values.resize(max_elements);
+    }
+
+    ALWAYS_INLINE void insertResultInto(DB::IColumn & to, size_t max_elements, 
const SortOrderFields & sort_orders)
+    {
+        auto & result_array = assert_cast<DB::ColumnArray &>(to);
+        auto & result_array_offsets = result_array.getOffsets();
+
+        sortAndLimit(max_elements, sort_orders);
+
+        result_array_offsets.push_back(result_array_offsets.back() + 
values.size());
+
+        if (values.empty())
+            return;
+        auto & result_array_data = result_array.getData();
+        for (int i = 0, sz = static_cast<int>(values.size()); i < sz; ++i)
+        {
+            auto & value = values[i];
+            value.push_back(i + 1);
+            result_array_data.insert(value);
+        }
+    }
+};
+
+static DB::DataTypePtr getRowNumReultDataType(DB::DataTypePtr data_type)
+{
+    const auto * tuple_type = typeid_cast<const DB::DataTypeTuple 
*>(data_type.get());
+    if (!tuple_type)
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Tuple type is 
expected, but got: {}", data_type->getName());
+    DB::DataTypes element_types = tuple_type->getElements();
+    std::vector<String> element_names = tuple_type->getElementNames();
+    element_types.push_back(std::make_shared<DB::DataTypeInt32>());
+    element_names.push_back("row_num");
+    auto nested_tuple_type = 
std::make_shared<DB::DataTypeTuple>(element_types, element_names);
+    return std::make_shared<DB::DataTypeArray>(nested_tuple_type);
+}
+
+// usage: rowNumGroupArraySorted(1, "a asc nulls first, b desc nulls 
last")(tuple(a,b))
+class RowNumGroupArraySorted final : public 
DB::IAggregateFunctionDataHelper<RowNumGroupArraySortedData, 
RowNumGroupArraySorted>
+{
+public:
+    explicit RowNumGroupArraySorted(DB::DataTypePtr data_type, const DB::Array 
& parameters_)
+        : DB::IAggregateFunctionDataHelper<RowNumGroupArraySortedData, 
RowNumGroupArraySorted>(
+            {data_type}, parameters_, getRowNumReultDataType(data_type))
+    {
+        if (parameters_.size() != 2)
+            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "{} needs two 
parameters: limit and order clause", getName());
+        const auto * tuple_type = typeid_cast<const DB::DataTypeTuple 
*>(data_type.get());
+        if (!tuple_type)
+            throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Tuple type is 
expected, but got: {}", data_type->getName());
+
+        limit = parameters_[0].safeGet<UInt64>();
+
+        String order_by_clause = parameters_[1].safeGet<String>();
+        sort_order_fields = parseSortOrderFields(order_by_clause);
+
+        serialization = data_type->getDefaultSerialization();
+    }
+
+    String getName() const override { return "rowNumGroupArraySorted"; }
+
+    void add(DB::AggregateDataPtr __restrict place, const DB::IColumn ** 
columns, size_t row_num, DB::Arena * /*arena*/) const override
+    {
+        auto & data = this->data(place);
+        DB::Tuple data_tuple = (*columns[0])[row_num].safeGet<DB::Tuple>();
+        this->data(place).addElement(std::move(data_tuple), sort_order_fields, 
limit);
+    }
+
+    void merge(DB::AggregateDataPtr __restrict place, 
DB::ConstAggregateDataPtr rhs, DB::Arena * /*arena*/) const override
+    {
+        auto & rhs_values = this->data(rhs).values;
+        for (auto & rhs_element : rhs_values)
+            this->data(place).addElement(std::move(rhs_element), 
sort_order_fields, limit);
+    }
+
+    void serialize(DB::ConstAggregateDataPtr __restrict place, DB::WriteBuffer 
& buf, std::optional<size_t> /* version */) const override
+    {
+        auto & values = this->data(place).values;
+        size_t size = values.size();
+        DB::writeVarUInt(size, buf);
+
+        for (const auto & value : values)
+            serialization->serializeBinary(value, buf, {});
+    }
+
+    void deserialize(
+        DB::AggregateDataPtr __restrict place, DB::ReadBuffer & buf, 
std::optional<size_t> /* version */, DB::Arena *) const override
+    {
+        size_t size = 0;
+        DB::readVarUInt(size, buf);
+
+        auto & values = this->data(place).values;
+        values.reserve(size);
+        for (size_t i = 0; i < size; ++i)
+        {
+            DB::Field data;
+            serialization->deserializeBinary(data, buf, {});
+            values.emplace_back(data.safeGet<DB::Tuple>());
+        }
+    }
+
+    void insertResultInto(DB::AggregateDataPtr __restrict place, DB::IColumn & 
to, DB::Arena * /*arena*/) const override
+    {
+        this->data(place).insertResultInto(to, limit, sort_order_fields);
+    }
+
+    bool allocatesMemoryInArena() const override { return true; }
+
+private:
+    size_t limit = 0;
+    SortOrderFields sort_order_fields;
+    DB::SerializationPtr serialization;
+
+    SortOrderFields parseSortOrderFields(const String & order_by_clause) const
+    {
+        DB::ParserOrderByExpressionList order_by_parser;
+        auto order_by_ast = DB::parseQuery(order_by_parser, order_by_clause, 
1000, 1000, 1000);
+        SortOrderFields fields;
+        const auto expression_list_ast = assert_cast<const 
DB::ASTExpressionList *>(order_by_ast.get());
+        const auto & tuple_element_names = assert_cast<const DB::DataTypeTuple 
*>(argument_types[0].get())->getElementNames();
+        for (const auto & child : expression_list_ast->children)
+        {
+            const auto * order_by_element_ast = assert_cast<const 
DB::ASTOrderByElement *>(child.get());
+            const auto * ident_ast = assert_cast<const DB::ASTIdentifier 
*>(order_by_element_ast->children[0].get());
+            const auto & ident_name = ident_ast->shortName();
+
+
+            SortOrderField field;
+            field.direction = order_by_element_ast->direction == 1;
+            field.nulls_direction
+                = field.direction ? order_by_element_ast->nulls_direction == 
-1 : order_by_element_ast->nulls_direction == 1;
+
+            auto name_pos = std::find(tuple_element_names.begin(), 
tuple_element_names.end(), ident_name);
+            if (name_pos == tuple_element_names.end())
+            {
+                throw DB::Exception(
+                    DB::ErrorCodes::BAD_ARGUMENTS, "Not found column {} in 
tuple {}", ident_name, argument_types[0]->getName());
+            }
+            field.pos = std::distance(tuple_element_names.begin(), name_pos);
+
+            fields.push_back(field);
+        }
+        return fields;
+    }
+};
+
+
+DB::AggregateFunctionPtr createAggregateFunctionRowNumGroupArray(
+    const std::string & name, const DB::DataTypes & argument_types, const 
DB::Array & parameters, const DB::Settings *)
+{
+    if (argument_types.size() != 1 || !typeid_cast<const DB::DataTypeTuple 
*>(argument_types[0].get()))
+        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, " {} Nees only one 
tuple argument", name);
+    return std::make_shared<RowNumGroupArraySorted>(argument_types[0], 
parameters);
+}
+
+void registerAggregateFunctionRowNumGroup(DB::AggregateFunctionFactory & 
factory)
+{
+    DB::AggregateFunctionProperties properties = 
{.returns_default_when_only_null = false, .is_order_dependent = false};
+
+    factory.registerFunction("rowNumGroupArraySorted", 
{createAggregateFunctionRowNumGroupArray, properties});
+}
+}
diff --git a/cpp-ch/local-engine/Common/AggregateUtil.cpp 
b/cpp-ch/local-engine/Common/AggregateUtil.cpp
index 2290747fa1..0707d18aa0 100644
--- a/cpp-ch/local-engine/Common/AggregateUtil.cpp
+++ b/cpp-ch/local-engine/Common/AggregateUtil.cpp
@@ -47,7 +47,6 @@ extern const SettingsBool 
enable_memory_bound_merging_of_aggregation_results;
 extern const SettingsUInt64 aggregation_in_order_max_block_bytes;
 extern const SettingsUInt64 group_by_two_level_threshold;
 extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
-extern const SettingsMaxThreads max_threads;
 extern const SettingsUInt64 max_block_size;
 }
 
diff --git a/cpp-ch/local-engine/Common/ArrayJoinHelper.cpp 
b/cpp-ch/local-engine/Common/ArrayJoinHelper.cpp
new file mode 100644
index 0000000000..acefad0aea
--- /dev/null
+++ b/cpp-ch/local-engine/Common/ArrayJoinHelper.cpp
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ArrayJoinHelper.h"
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeArray.h>
+#include <Interpreters/ActionsDAG.h>
+#include <Interpreters/ArrayJoin.h>
+#include <Interpreters/Context.h>
+#include <Processors/QueryPlan/ArrayJoinStep.h>
+#include <Processors/QueryPlan/ExpressionStep.h>
+#include <Processors/QueryPlan/IQueryPlanStep.h>
+#include <Processors/QueryPlan/QueryPlan.h>
+#include <Poco/Logger.h>
+#include <Common/logger_useful.h>
+
+namespace DB
+{
+namespace
+{
+extern const int LOGICAL_ERROR;
+}
+
+namespace Setting
+{
+extern const SettingsUInt64 max_block_size;
+}
+}
+
+namespace local_engine
+{
+namespace ArrayJoinHelper
+{
+const DB::ActionsDAG::Node * findArrayJoinNode(const DB::ActionsDAG & 
actions_dag)
+{
+    const DB::ActionsDAG::Node * array_join_node = nullptr;
+    const auto & nodes = actions_dag.getNodes();
+    for (const auto & node : nodes)
+    {
+        if (node.type == DB::ActionsDAG::ActionType::ARRAY_JOIN)
+        {
+            if (array_join_node)
+                throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expect 
single ARRAY JOIN node in generate rel");
+            array_join_node = &node;
+        }
+    }
+    return array_join_node;
+}
+
+struct SplittedActionsDAGs
+{
+    DB::ActionsDAG before_array_join; /// Optional
+    DB::ActionsDAG array_join;
+    DB::ActionsDAG after_array_join; /// Optional
+};
+
+/// Split actions_dag of generate rel into 3 parts: before array join + during 
array join + after array join
+static SplittedActionsDAGs splitActionsDAGInGenerate(const DB::ActionsDAG & 
actions_dag)
+{
+    SplittedActionsDAGs res;
+
+    auto array_join_node = findArrayJoinNode(actions_dag);
+    std::unordered_set<const DB::ActionsDAG::Node *> 
first_split_nodes(array_join_node->children.begin(), 
array_join_node->children.end());
+    auto first_split_result = actions_dag.split(first_split_nodes);
+    res.before_array_join = std::move(first_split_result.first);
+
+    array_join_node = findArrayJoinNode(first_split_result.second);
+    std::unordered_set<const DB::ActionsDAG::Node *> second_split_nodes = 
{array_join_node};
+    auto second_split_result = 
first_split_result.second.split(second_split_nodes);
+    res.array_join = std::move(second_split_result.first);
+    second_split_result.second.removeUnusedActions();
+    res.after_array_join = std::move(second_split_result.second);
+    return res;
+}
+
+DB::ActionsDAG applyArrayJoinOnOneColumn(const DB::Block & header, size_t 
column_index)
+{
+    auto arrayColumn = header.getByPosition(column_index);
+    if (!typeid_cast<const DB::DataTypeArray *>(arrayColumn.type.get()))
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expect array 
column in array join");
+    DB::ActionsDAG actions_dag(header.getColumnsWithTypeAndName());
+    const auto * array_column_node = actions_dag.getInputs()[column_index];
+    auto array_join_name = array_column_node->result_name;
+    const auto * array_join_node = 
&actions_dag.addArrayJoin(*array_column_node, array_join_name);
+    actions_dag.addOrReplaceInOutputs(*array_join_node);
+    return std::move(actions_dag);
+}
+
+
+std::vector<DB::IQueryPlanStep *>
+addArrayJoinStep(DB::ContextPtr context, DB::QueryPlan & plan, const 
DB::ActionsDAG & actions_dag, bool is_left)
+{
+    auto logger = getLogger("ArrayJoinHelper");
+    std::vector<DB::IQueryPlanStep *> steps;
+    if (findArrayJoinNode(actions_dag))
+    {
+        /// If generator in generate rel is explode/posexplode, transform 
arrayJoin function to ARRAY JOIN STEP to apply max_block_size
+        /// which avoids OOM when several lateral view explode/posexplode is 
used in spark sqls
+        LOG_DEBUG(logger, "original actions_dag:{}", actions_dag.dumpDAG());
+        auto splitted_actions_dags = splitActionsDAGInGenerate(actions_dag);
+        LOG_DEBUG(logger, "actions_dag before arrayJoin:{}", 
splitted_actions_dags.before_array_join.dumpDAG());
+        LOG_DEBUG(logger, "actions_dag during arrayJoin:{}", 
splitted_actions_dags.array_join.dumpDAG());
+        LOG_DEBUG(logger, "actions_dag after arrayJoin:{}", 
splitted_actions_dags.after_array_join.dumpDAG());
+
+        auto ignore_actions_dag = [](const DB::ActionsDAG & actions_dag_) -> 
bool
+        {
+            /*
+            We should ignore actions_dag like:
+            0 : INPUT () (no column) String a
+            1 : INPUT () (no column) String b
+            Output nodes: 0, 1
+             */
+            return actions_dag_.getOutputs().size() == 
actions_dag_.getNodes().size()
+                && actions_dag_.getInputs().size() == 
actions_dag_.getNodes().size();
+        };
+
+        /// Pre-projection before array join
+        if (!ignore_actions_dag(splitted_actions_dags.before_array_join))
+        {
+            auto step_before_array_join
+                = 
std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), 
std::move(splitted_actions_dags.before_array_join));
+            step_before_array_join->setStepDescription("Pre-projection In 
Generate");
+            steps.emplace_back(step_before_array_join.get());
+            plan.addStep(std::move(step_before_array_join));
+            // LOG_DEBUG(logger, "plan1:{}", 
PlanUtil::explainPlan(*query_plan));
+        }
+
+        /// ARRAY JOIN
+        DB::Names 
array_joined_columns{findArrayJoinNode(splitted_actions_dags.array_join)->result_name};
+        DB::ArrayJoin array_join;
+        array_join.columns = std::move(array_joined_columns);
+        array_join.is_left = is_left;
+        auto array_join_step = std::make_unique<DB::ArrayJoinStep>(
+            plan.getCurrentHeader(), std::move(array_join), false, 
context->getSettingsRef()[DB::Setting::max_block_size]);
+        array_join_step->setStepDescription("ARRAY JOIN In Generate");
+        steps.emplace_back(array_join_step.get());
+        plan.addStep(std::move(array_join_step));
+        // LOG_DEBUG(logger, "plan2:{}", PlanUtil::explainPlan(*query_plan));
+
+        /// Post-projection after array join(Optional)
+        if (!ignore_actions_dag(splitted_actions_dags.after_array_join))
+        {
+            auto step_after_array_join
+                = 
std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), 
std::move(splitted_actions_dags.after_array_join));
+            step_after_array_join->setStepDescription("Post-projection In 
Generate");
+            steps.emplace_back(step_after_array_join.get());
+            plan.addStep(std::move(step_after_array_join));
+            // LOG_DEBUG(logger, "plan3:{}", 
PlanUtil::explainPlan(*query_plan));
+        }
+    }
+    else
+    {
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expect array join 
node in actions_dag");
+    }
+
+    return steps;
+}
+
+
+}
+} // namespace local_engine
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/ArrayJoinHelper.h 
b/cpp-ch/local-engine/Common/ArrayJoinHelper.h
new file mode 100644
index 0000000000..a4b582b2f0
--- /dev/null
+++ b/cpp-ch/local-engine/Common/ArrayJoinHelper.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Interpreters/ActionsDAG.h>
+
+namespace DB
+{
+class IQueryPlanStep;
+class QueryPlan;
+}
+
+namespace local_engine
+{
+namespace ArrayJoinHelper
+{
+// apply array join on one column to flatten the array column
+DB::ActionsDAG applyArrayJoinOnOneColumn(const DB::Block & header, size_t 
column_index);
+
+const DB::ActionsDAG::Node * findArrayJoinNode(const DB::ActionsDAG & 
actions_dag);
+
+// actions_dag is a actions dag that contains the array join node, if not, the 
plan will not be changed.
+// return the steps that are added to the plan.
+std::vector<DB::IQueryPlanStep *> addArrayJoinStep(DB::ContextPtr context, 
DB::QueryPlan & plan, const DB::ActionsDAG & actions_dag, bool is_left);
+} // namespace ArrayJoinHelper
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 8fef52e50a..de1a263218 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -887,6 +887,7 @@ void 
BackendInitializerUtil::applyGlobalConfigAndSettings(const DB::Context::Con
 extern void 
registerAggregateFunctionCombinatorPartialMerge(AggregateFunctionCombinatorFactory
 &);
 extern void registerAggregateFunctionsBloomFilter(AggregateFunctionFactory &);
 extern void registerAggregateFunctionSparkAvg(AggregateFunctionFactory &);
+extern void registerAggregateFunctionRowNumGroup(AggregateFunctionFactory &);
 extern void registerFunctions(FunctionFactory &);
 
 void registerAllFunctions()
@@ -897,6 +898,7 @@ void registerAllFunctions()
     auto & agg_factory = AggregateFunctionFactory::instance();
     registerAggregateFunctionsBloomFilter(agg_factory);
     registerAggregateFunctionSparkAvg(agg_factory);
+    registerAggregateFunctionRowNumGroup(agg_factory);
     {
         /// register aggregate function combinators from local_engine
         auto & factory = AggregateFunctionCombinatorFactory::instance();
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp 
b/cpp-ch/local-engine/Common/GlutenConfig.cpp
index 0cefbc3839..ce15a12f92 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.cpp
+++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp
@@ -141,4 +141,13 @@ MergeTreeCacheConfig 
MergeTreeCacheConfig::loadFromContext(const DB::ContextPtr
     config.enable_data_prefetch = 
context->getConfigRef().getBool(ENABLE_DATA_PREFETCH, 
config.enable_data_prefetch);
     return config;
 }
-}
\ No newline at end of file
+
+WindowConfig WindowConfig::loadFromContext(const DB::ContextPtr & context)
+{
+    WindowConfig config;
+    config.aggregate_topk_sample_rows = 
context->getConfigRef().getUInt64(WINDOW_AGGREGATE_TOPK_SAMPLE_ROWS, 5000);
+    config.aggregate_topk_high_cardinality_threshold
+        = 
context->getConfigRef().getDouble(WINDOW_AGGREGATE_TOPK_HIGH_CARDINALITY_THRESHOLD,
 0.6);
+    return config;
+}
+}
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h 
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 8af83329b6..85839b70ec 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -56,9 +56,12 @@ struct GraceMergingAggregateConfig
 {
     inline static const String MAX_GRACE_AGGREGATE_MERGING_BUCKETS = 
"max_grace_aggregate_merging_buckets";
     inline static const String 
THROW_ON_OVERFLOW_GRACE_AGGREGATE_MERGING_BUCKETS = 
"throw_on_overflow_grace_aggregate_merging_buckets";
-    inline static const String 
AGGREGATED_KEYS_BEFORE_EXTEND_GRACE_AGGREGATE_MERGING_BUCKETS = 
"aggregated_keys_before_extend_grace_aggregate_merging_buckets";
-    inline static const String 
MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET = 
"max_pending_flush_blocks_per_grace_aggregate_merging_bucket";
-    inline static const String 
MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING = 
"max_allowed_memory_usage_ratio_for_aggregate_merging";
+    inline static const String 
AGGREGATED_KEYS_BEFORE_EXTEND_GRACE_AGGREGATE_MERGING_BUCKETS
+        = "aggregated_keys_before_extend_grace_aggregate_merging_buckets";
+    inline static const String 
MAX_PENDING_FLUSH_BLOCKS_PER_GRACE_AGGREGATE_MERGING_BUCKET
+        = "max_pending_flush_blocks_per_grace_aggregate_merging_bucket";
+    inline static const String 
MAX_ALLOWED_MEMORY_USAGE_RATIO_FOR_AGGREGATE_MERGING
+        = "max_allowed_memory_usage_ratio_for_aggregate_merging";
 
     size_t max_grace_aggregate_merging_buckets = 32;
     bool throw_on_overflow_grace_aggregate_merging_buckets = false;
@@ -73,7 +76,8 @@ struct StreamingAggregateConfig
 {
     inline static const String 
AGGREGATED_KEYS_BEFORE_STREAMING_AGGREGATING_EVICT = 
"aggregated_keys_before_streaming_aggregating_evict";
     inline static const String 
MAX_MEMORY_USAGE_RATIO_FOR_STREAMING_AGGREGATING = 
"max_memory_usage_ratio_for_streaming_aggregating";
-    inline static const String 
HIGH_CARDINALITY_THRESHOLD_FOR_STREAMING_AGGREGATING = 
"high_cardinality_threshold_for_streaming_aggregating";
+    inline static const String 
HIGH_CARDINALITY_THRESHOLD_FOR_STREAMING_AGGREGATING
+        = "high_cardinality_threshold_for_streaming_aggregating";
     inline static const String ENABLE_STREAMING_AGGREGATING = 
"enable_streaming_aggregating";
 
     size_t aggregated_keys_before_streaming_aggregating_evict = 1024;
@@ -154,6 +158,16 @@ struct MergeTreeCacheConfig
     static MergeTreeCacheConfig loadFromContext(const DB::ContextPtr & 
context);
 };
 
+struct WindowConfig
+{
+public:
+    inline static const String WINDOW_AGGREGATE_TOPK_SAMPLE_ROWS = 
"window.aggregate_topk_sample_rows";
+    inline static const String 
WINDOW_AGGREGATE_TOPK_HIGH_CARDINALITY_THRESHOLD = 
"window.aggregate_topk_high_cardinality_threshold";
+    size_t aggregate_topk_sample_rows = 5000;
+    double aggregate_topk_high_cardinality_threshold = 0.6;
+    static WindowConfig loadFromContext(const DB::ContextPtr & context);
+};
+
 namespace PathConfig
 {
 inline constexpr const char * USE_CURRENT_DIRECTORY_AS_TMP = 
"use_current_directory_as_tmp";
diff --git a/cpp-ch/local-engine/Common/SortUtils.cpp 
b/cpp-ch/local-engine/Common/SortUtils.cpp
new file mode 100644
index 0000000000..1b18cc4bfa
--- /dev/null
+++ b/cpp-ch/local-engine/Common/SortUtils.cpp
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SortUtils.h"
+#include <IO/Operators.h>
+#include <IO/WriteBufferFromString.h>
+#include <Poco/Logger.h>
+#include <Common/Exception.h>
+#include <Common/logger_useful.h>
+
+namespace DB::ErrorCodes
+{
+extern const int BAD_ARGUMENTS;
+extern const int LOGICAL_ERROR;
+}
+
+namespace local_engine
+{
+DB::SortDescription parseSortFields(const DB::Block & header, const 
google::protobuf::RepeatedPtrField<substrait::Expression> & expressions)
+{
+    DB::SortDescription description;
+    for (const auto & expr : expressions)
+        if (expr.has_selection())
+        {
+            auto pos = 
expr.selection().direct_reference().struct_field().field();
+            const auto & col_name = header.getByPosition(pos).name;
+            description.push_back(DB::SortColumnDescription(col_name, 1, 1));
+        }
+        else if (expr.has_literal())
+            continue;
+        else
+            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow 
expression as sort field: {}", expr.DebugString());
+    return description;
+}
+
+DB::SortDescription parseSortFields(const DB::Block & header, const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields)
+{
+    static std::map<int, std::pair<int, int>> direction_map = {{1, {1, -1}}, 
{2, {1, 1}}, {3, {-1, 1}}, {4, {-1, -1}}};
+
+    DB::SortDescription sort_descr;
+    for (int i = 0, sz = sort_fields.size(); i < sz; ++i)
+    {
+        const auto & sort_field = sort_fields[i];
+        /// There is no meaning to sort a const column.
+        if (sort_field.expr().has_literal())
+            continue;
+
+        if (!sort_field.expr().has_selection() || 
!sort_field.expr().selection().has_direct_reference()
+            || 
!sort_field.expr().selection().direct_reference().has_struct_field())
+        {
+            throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unsupport sort 
field");
+        }
+        auto field_pos = 
sort_field.expr().selection().direct_reference().struct_field().field();
+
+        auto direction_iter = direction_map.find(sort_field.direction());
+        if (direction_iter == direction_map.end())
+            throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unsuppor sort 
direction: {}", sort_field.direction());
+        const auto & col_name = header.getByPosition(field_pos).name;
+        sort_descr.emplace_back(col_name, direction_iter->second.first, 
direction_iter->second.second);
+    }
+    return sort_descr;
+}
+
+std::string
+buildSQLLikeSortDescription(const DB::Block & header, const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields)
+{
+    static const std::unordered_map<int, std::string> order_directions
+        = {{1, " asc nulls first"}, {2, " asc nulls last"}, {3, " desc nulls 
first"}, {4, " desc nulls last"}};
+    size_t n = 0;
+    DB::WriteBufferFromOwnString ostr;
+    for (const auto & sort_field : sort_fields)
+    {
+        auto it = order_directions.find(sort_field.direction());
+        if (it == order_directions.end())
+            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow sort 
direction: {}", sort_field.direction());
+        if (!sort_field.expr().has_selection())
+        {
+            throw DB::Exception(
+                DB::ErrorCodes::BAD_ARGUMENTS, "Sort field must be a column 
reference. but got {}", sort_field.DebugString());
+        }
+        auto ref = 
sort_field.expr().selection().direct_reference().struct_field().field();
+        const auto & col_name = header.getByPosition(ref).name;
+        if (n)
+            ostr << String(",");
+        // the col_name may contain '#' which can may ch fail to parse.
+        ostr << "`" << col_name << "`" << it->second;
+        n += 1;
+    }
+    LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Order by clasue: 
{}", ostr.str());
+    return ostr.str();
+}
+}
diff --git a/cpp-ch/local-engine/Common/SortUtils.h 
b/cpp-ch/local-engine/Common/SortUtils.h
new file mode 100644
index 0000000000..c460fa758b
--- /dev/null
+++ b/cpp-ch/local-engine/Common/SortUtils.h
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <Core/Block.h>
+#include <Core/SortDescription.h>
+#include <google/protobuf/repeated_field.h>
+#include <substrait/plan.pb.h>
+
+namespace local_engine
+{
+// convert expressions into sort description
+DB::SortDescription
+parseSortFields(const DB::Block & header, const 
google::protobuf::RepeatedPtrField<substrait::Expression> & expressions);
+DB::SortDescription parseSortFields(const DB::Block & header, const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields);
+
+std::string
+buildSQLLikeSortDescription(const DB::Block & header, const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields);
+}
diff --git a/cpp-ch/local-engine/Operator/BranchStep.cpp 
b/cpp-ch/local-engine/Operator/BranchStep.cpp
new file mode 100644
index 0000000000..5e379ae9d4
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/BranchStep.cpp
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BranchStep.h"
+#include <memory>
+#include <Interpreters/Context_fwd.h>
+#include <Operator/BranchTransform.h>
+#include <Processors/IProcessor.h>
+#include <Processors/ISource.h>
+#include <Processors/Port.h>
+#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
+#include <Processors/QueryPlan/IQueryPlanStep.h>
+#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
+#include <Processors/QueryPlan/ReadFromPreparedSource.h>
+#include <QueryPipeline/Pipe.h>
+#include <QueryPipeline/QueryPipelineBuilder.cpp>
+#include <QueryPipeline/QueryPlanResourceHolder.h>
+#include <Poco/Logger.h>
+#include <Common/logger_useful.h>
+
+namespace local_engine
+{
+
+class BranchHookSource : public DB::IProcessor
+{
+public:
+    using Status = DB::IProcessor::Status;
+    BranchHookSource(const DB::Block & header_) : DB::IProcessor({}, 
{header_}) { inner_inputs.emplace_back(header_, this); }
+    ~BranchHookSource() override = default;
+
+    String getName() const override { return "BranchHookSource"; }
+
+    Status prepare() override;
+    void work() override;
+    void enableInputs() { inputs.swap(inner_inputs); }
+
+private:
+    DB::InputPorts inner_inputs;
+    bool has_output = false;
+    DB::Chunk output_chunk;
+    bool has_input = false;
+    DB::Chunk input_chunk;
+};
+
+BranchHookSource::Status BranchHookSource::prepare()
+{
+    auto & output = outputs.front();
+    auto & input = inputs.front();
+    if (output.isFinished())
+    {
+        input.close();
+        return Status::Finished;
+    }
+    if (has_output)
+    {
+        if (output.canPush())
+        {
+            output.push(std::move(output_chunk));
+            has_output = false;
+        }
+        return Status::PortFull;
+    }
+    if (has_input)
+        return Status::Ready;
+    if (input.isFinished())
+    {
+        output.finish();
+        return Status::Finished;
+    }
+    input.setNeeded();
+    if (!input.hasData())
+        return Status::NeedData;
+    input_chunk = input.pull(true);
+    has_input = true;
+    return Status::Ready;
+}
+
+void BranchHookSource::work()
+{
+    if (has_input)
+    {
+        output_chunk = std::move(input_chunk);
+        has_output = true;
+        has_input = false;
+    }
+}
+
+
+static DB::ITransformingStep::Traits getTraits()
+{
+    return DB::ITransformingStep::Traits{
+        {
+            .returns_single_stream = true,
+            .preserves_number_of_streams = false,
+            .preserves_sorting = false,
+        },
+        {
+            .preserves_number_of_rows = false,
+        }};
+}
+
+class ResizeStep : public DB::ITransformingStep
+{
+public:
+    explicit ResizeStep(const DB::Block & header_, size_t num_streams_)
+        : DB::ITransformingStep(header_, header_, getTraits()), 
num_streams(num_streams_)
+    {
+    }
+    ~ResizeStep() override = default;
+
+    String getName() const override { return "UniteBranchesStep"; }
+
+    void transformPipeline(DB::QueryPipelineBuilder & pipeline, const 
DB::BuildQueryPipelineSettings &) override
+    {
+        LOG_ERROR(getLogger("ResizeStep"), "xxx num_streams: {}", num_streams);
+        pipeline.resize(num_streams);
+    }
+    void describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const 
override
+    {
+        if (!processors.empty())
+            DB::IQueryPlanStep::describePipeline(processors, settings);
+    }
+
+private:
+    size_t num_streams;
+    void updateOutputHeader() override {};
+};
+
+DB::QueryPlanPtr BranchStepHelper::createSubPlan(const DB::Block & header, 
size_t num_streams)
+{
+    auto source = 
std::make_unique<DB::ReadFromPreparedSource>(DB::Pipe(std::make_shared<BranchHookSource>(header)));
+    source->setStepDescription("Hook node connected to one branch output");
+    auto plan = std::make_unique<DB::QueryPlan>();
+    plan->addStep(std::move(source));
+
+    if (num_streams > 1)
+    {
+        auto resize_step = 
std::make_unique<ResizeStep>(plan->getCurrentHeader(), num_streams);
+        plan->addStep(std::move(resize_step));
+    }
+    return std::move(plan);
+}
+
+StaticBranchStep::StaticBranchStep(
+    DB::ContextPtr context_, const DB::Block & header_, size_t branches_, 
size_t sample_rows_, BranchSelector selector_)
+    : DB::ITransformingStep(header_, header_, getTraits())
+    , context(context_)
+    , header(header_)
+    , branches(branches_)
+    , max_sample_rows(sample_rows_)
+    , selector(selector_)
+{
+}
+
+void StaticBranchStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, 
const DB::BuildQueryPipelineSettings & settings)
+{
+    auto build_transform = [&](DB::OutputPortRawPtrs child_outputs) -> 
DB::Processors
+    {
+        DB::Processors new_processors;
+        for (auto & output : child_outputs)
+        {
+            if (!output)
+                throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Output 
port is null");
+            auto branch_transform = 
std::make_shared<StaticBranchTransform>(header, max_sample_rows, branches, 
selector);
+            DB::connect(*output, branch_transform->getInputs().front());
+            new_processors.push_back(branch_transform);
+        }
+        return new_processors;
+    };
+    pipeline.resize(1);
+    pipeline.transform(build_transform);
+}
+
+void StaticBranchStep::describePipeline(DB::IQueryPlanStep::FormatSettings & 
settings) const
+{
+    if (!processors.empty())
+        DB::IQueryPlanStep::describePipeline(processors, settings);
+}
+
+void StaticBranchStep::updateOutputHeader()
+{
+}
+
+UniteBranchesStep::UniteBranchesStep(const DB::Block & header_, 
std::vector<DB::QueryPlanPtr> && branch_plans_, size_t num_streams_)
+    : DB::ITransformingStep(header_, branch_plans_[0]->getCurrentHeader(), 
getTraits()), header(header_)
+{
+    branch_plans.swap(branch_plans_);
+    size_t branches = branch_plans.size();
+    num_streams = num_streams_;
+}
+
+void UniteBranchesStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, 
const DB::BuildQueryPipelineSettings &)
+{
+    auto add_transform = [&](DB::OutputPortRawPtrs child_outputs) -> 
DB::Processors
+    {
+        DB::Processors new_processors;
+        size_t branch_index = 0;
+        if (child_outputs.size() != branch_plans.size())
+        {
+            throw DB::Exception(
+                DB::ErrorCodes::LOGICAL_ERROR,
+                "Output port's size({}) is not equal to branches size({})",
+                child_outputs.size(),
+                branch_plans.size());
+        }
+        for (auto output : child_outputs)
+        {
+            auto & branch_plan = branch_plans[branch_index];
+            DB::QueryPlanOptimizationSettings optimization_settings;
+            DB::BuildQueryPipelineSettings build_settings;
+            DB::QueryPlanResourceHolder resource_holder;
+
+            auto pipeline_builder = 
branch_plan->buildQueryPipeline(optimization_settings, build_settings);
+            auto pipe = 
DB::QueryPipelineBuilder::getPipe(std::move(*pipeline_builder), 
resource_holder);
+            DB::ProcessorPtr source_node = nullptr;
+            auto processors = DB::Pipe::detachProcessors(std::move(pipe));
+            for (auto processor : processors)
+            {
+                if (auto * source = typeid_cast<BranchHookSource 
*>(processor.get()))
+                {
+                    if (source->getInputs().empty())
+                    {
+                        if (source_node)
+                            throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, 
"There is multi source in branch plan");
+                        source->enableInputs();
+                        source_node = processor;
+                    }
+                }
+            }
+            if (!source_node)
+                throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot 
find source node in branch plan");
+            if (source_node->getInputs().empty())
+                throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Source 
node has no input");
+            DB::connect(*output, source_node->getInputs().front());
+            new_processors.insert(new_processors.end(), processors.begin(), 
processors.end());
+            branch_index++;
+        }
+        return new_processors;
+    };
+    pipeline.transform(add_transform);
+    pipeline.resize(1);
+    if (num_streams > 1)
+        pipeline.resize(num_streams);
+}
+
+void UniteBranchesStep::describePipeline(DB::IQueryPlanStep::FormatSettings & 
settings) const
+{
+    if (!processors.empty())
+        DB::IQueryPlanStep::describePipeline(processors, settings);
+}
+}
diff --git a/cpp-ch/local-engine/Operator/BranchStep.h 
b/cpp-ch/local-engine/Operator/BranchStep.h
new file mode 100644
index 0000000000..ddbd4c6fbb
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/BranchStep.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <cstddef>
+#include <Core/Block.h>
+#include <Interpreters/Context_fwd.h>
+#include <Processors/Chunk.h>
+#include <Processors/IProcessor.h>
+#include <Processors/QueryPlan/IQueryPlanStep.h>
+#include <Processors/QueryPlan/ITransformingStep.h>
+#include "Processors/Port.h"
+#include "Processors/QueryPlan/QueryPlan.h"
+
+namespace local_engine
+{
+
+class BranchStepHelper
+{
+public:
+    // Create a new query plan that would be used to build sub branch query 
plan.
+    static DB::QueryPlanPtr createSubPlan(const DB::Block & header, size_t 
num_streams);
+};
+
+// Use to branch the query plan.
+class StaticBranchStep : public DB::ITransformingStep
+{
+public:
+    using BranchSelector = std::function<size_t(const std::list<DB::Chunk> &)>;
+    explicit StaticBranchStep(
+        DB::ContextPtr context_, const DB::Block & header, size_t branches, 
size_t sample_rows, BranchSelector selector);
+    ~StaticBranchStep() override = default;
+
+    String getName() const override { return "StaticBranchStep"; }
+
+    // This will resize the num_streams to 1. You may need to resize after 
this.
+    void transformPipeline(DB::QueryPipelineBuilder & pipeline, const 
DB::BuildQueryPipelineSettings & settings) override;
+    void describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const 
override;
+
+protected:
+    void updateOutputHeader() override;
+
+private:
+    DB::ContextPtr context;
+    DB::Block header;
+    size_t max_sample_rows;
+    size_t branches;
+    BranchSelector selector;
+};
+
+
+// It should be better to build execution branches on QueryPlan.
+class UniteBranchesStep : public DB::ITransformingStep
+{
+public:
+    explicit UniteBranchesStep(const DB::Block & header_, 
std::vector<DB::QueryPlanPtr> && branch_plans_, size_t num_streams_);
+    ~UniteBranchesStep() override = default;
+
+    String getName() const override { return "UniteBranchesStep"; }
+
+    void transformPipeline(DB::QueryPipelineBuilder & pipelines, const 
DB::BuildQueryPipelineSettings &) override;
+    void describePipeline(DB::IQueryPlanStep::FormatSettings & settings) const 
override;
+
+private:
+    DB::Block header;
+    std::vector<DB::QueryPlanPtr> branch_plans;
+    size_t num_streams;
+
+    void updateOutputHeader() override { output_header = header; };
+};
+
+}
diff --git a/cpp-ch/local-engine/Operator/BranchTransform.cpp 
b/cpp-ch/local-engine/Operator/BranchTransform.cpp
new file mode 100644
index 0000000000..f923f4ac4b
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/BranchTransform.cpp
@@ -0,0 +1,155 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "BranchTransform.h"
+#include <iterator>
+#include <Processors/IProcessor.h>
+#include <Poco/Logger.h>
+#include <Common/Exception.h>
+#include <Common/logger_useful.h>
+
+namespace DB::ErrorCodes
+{
+extern const int LOGICAL_ERROR;
+}
+
+namespace local_engine
+{
+static DB::OutputPorts buildOutputPorts(const DB::Block & header, size_t 
branches)
+{
+    DB::OutputPorts output_ports;
+    for (size_t i = 0; i < branches; ++i)
+        output_ports.emplace_back(header);
+    return output_ports;
+}
+StaticBranchTransform::StaticBranchTransform(const DB::Block & header_, size_t 
sample_rows_, size_t branches_, BranchSelector selector_)
+    : DB::IProcessor({header_}, buildOutputPorts(header_, branches_)), 
max_sample_rows(sample_rows_), selector(selector_)
+{
+}
+
+static bool existFinishedOutput(const DB::OutputPorts & output_ports)
+{
+    for (const auto & output_port : output_ports)
+        if (output_port.isFinished())
+            return true;
+    return false;
+}
+
+StaticBranchTransform::Status StaticBranchTransform::prepare()
+{
+    auto & input = inputs.front();
+    if ((selected_output_port && selected_output_port->isFinished()) || 
(!selected_output_port && existFinishedOutput(outputs)))
+    {
+        input.close();
+        return Status::Finished;
+    }
+
+    if (has_output)
+    {
+        assert(selected_output_port != nullptr);
+        if (selected_output_port->canPush())
+        {
+            selected_output_port->push(std::move(output_chunk));
+            has_output = false;
+        }
+        return Status::PortFull;
+    }
+
+    if (has_input || (selected_output_port && !sample_chunks.empty()))
+    {
+        // to clear the pending chunks
+        return Status::Ready;
+    }
+
+    if (input.isFinished())
+    {
+        if (!sample_chunks.empty())
+        {
+            // to clear the pending chunks
+            return Status::Ready;
+        }
+        else
+        {
+            if (selected_output_port)
+                selected_output_port->finish();
+            else
+                for (auto & output_port : outputs)
+                    output_port.finish();
+            return Status::Finished;
+        }
+    }
+
+    input.setNeeded();
+    if (!input.hasData())
+        return Status::NeedData;
+    input_chunk = input.pull(true);
+    has_input = true;
+    return Status::Ready;
+}
+
+void StaticBranchTransform::work()
+{
+    if (selected_output_port)
+    {
+        if (!sample_chunks.empty())
+        {
+            assert(!has_input);
+            has_output = true;
+            output_chunk.swap(sample_chunks.front());
+            sample_chunks.pop_front();
+        }
+        else
+        {
+            assert(has_input);
+            has_input = false;
+            has_output = true;
+            output_chunk.swap(input_chunk);
+        }
+    }
+    else if (has_input)
+    {
+        sample_rows += input_chunk.getNumRows();
+        sample_chunks.emplace_back(std::move(input_chunk));
+        if (sample_rows >= max_sample_rows)
+            setupOutputPort();
+        has_input = false;
+    }
+    else if (!sample_chunks.empty())
+    {
+        if (!selected_output_port)
+            setupOutputPort();
+        output_chunk.swap(sample_chunks.front());
+        sample_chunks.pop_front();
+        has_output = true;
+    }
+}
+
+void StaticBranchTransform::setupOutputPort()
+{
+    size_t branch_index = selector(sample_chunks);
+    LOG_DEBUG(getLogger("StaticBranchTransform"), "Select output port: {}", 
branch_index);
+    if (branch_index >= outputs.size())
+        throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Branch index {} is 
out of range(0, {})", branch_index, outputs.size());
+    auto it = outputs.begin();
+    std::advance(it, branch_index);
+    selected_output_port = &(*it);
+    // close other output ports
+    for (auto oit = outputs.begin(); oit != outputs.end(); ++oit)
+        if (oit != it)
+            oit->finish();
+}
+} // namespace local_engine
diff --git a/cpp-ch/local-engine/Operator/BranchTransform.h 
b/cpp-ch/local-engine/Operator/BranchTransform.h
new file mode 100644
index 0000000000..f5284b5ae9
--- /dev/null
+++ b/cpp-ch/local-engine/Operator/BranchTransform.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <Processors/Chunk.h>
+#include <Processors/IProcessor.h>
+#include <Processors/Port.h>
+
+namespace local_engine
+{
+
+// This is designed for adaptive execution. It has multiple outputs, each 
indicates for a execution branches.
+// It accepts a branch selector, this selector will analysis the input data, 
and select one of the output port
+// as the final only output port.  Other output ports will be closed.
+// The output port cannot be changed once it's selected.
+class StaticBranchTransform : public DB::IProcessor
+{
+public:
+    using BranchSelector = std::function<size_t(const std::list<DB::Chunk> &)>;
+    using Status = DB::IProcessor::Status;
+    StaticBranchTransform(const DB::Block & header_, size_t sample_rows_, 
size_t branches_, BranchSelector selector_);
+
+    String getName() const override { return "StaticBranchTransform"; }
+
+    Status prepare() override;
+    void work() override;
+
+private:
+    size_t max_sample_rows;
+    BranchSelector selector;
+    DB::OutputPort * selected_output_port = nullptr;
+    std::list<DB::Chunk> sample_chunks;
+    size_t sample_rows = 0;
+    bool has_input = false;
+    bool has_output = false;
+    DB::Chunk input_chunk;
+    DB::Chunk output_chunk;
+
+    void setupOutputPort();
+};
+
+};
diff --git a/cpp-ch/local-engine/Operator/WindowGroupLimitStep.cpp 
b/cpp-ch/local-engine/Operator/WindowGroupLimitStep.cpp
index f25e3f22ac..d2264e24dc 100644
--- a/cpp-ch/local-engine/Operator/WindowGroupLimitStep.cpp
+++ b/cpp-ch/local-engine/Operator/WindowGroupLimitStep.cpp
@@ -37,7 +37,6 @@ enum class WindowGroupLimitFunction
     DenseRank
 };
 
-
 template <WindowGroupLimitFunction function>
 class WindowGroupLimitTransform : public DB::IProcessor
 {
@@ -50,7 +49,6 @@ public:
         , partition_columns(partition_columns_)
         , sort_columns(sort_columns_)
         , limit(limit_)
-
     {
     }
     ~WindowGroupLimitTransform() override = default;
@@ -95,9 +93,7 @@ public:
     void work() override
     {
         if (!has_input) [[unlikely]]
-        {
             return;
-        }
         DB::Block block = header.cloneWithColumns(input_chunk.getColumns());
         size_t partition_start_row = 0;
         size_t chunk_rows = input_chunk.getNumRows();
@@ -151,7 +147,6 @@ private:
     DB::Columns partition_start_row_columns;
     DB::Columns peer_group_start_row_columns;
 
-
     size_t advanceNextPartition(const DB::Chunk & chunk, size_t start_offset)
     {
         if (partition_start_row_columns.empty())
@@ -159,12 +154,8 @@ private:
 
         size_t max_row = chunk.getNumRows();
         for (size_t i = start_offset; i < max_row; ++i)
-        {
             if (!isRowEqual(partition_columns, partition_start_row_columns, 0, 
chunk.getColumns(), i))
-            {
                 return i;
-            }
-        }
         return max_row;
     }
 
@@ -199,7 +190,6 @@ private:
         if (current_row_rank_value > limit)
             return;
 
-
         size_t chunk_rows = chunk.getNumRows();
         auto has_peer_group_ended = [&](size_t offset, size_t 
partition_end_offset, size_t chunk_rows_)
         { return offset < partition_end_offset || end_offset < chunk_rows_; };
@@ -241,6 +231,7 @@ private:
             size_t limit_remained = limit - current_row_rank_value + 1;
             rows = rows > limit_remained ? limit_remained : rows;
             insertResultValue(chunk, start_offset, rows);
+
             current_row_rank_value += rows;
         }
         else
@@ -249,8 +240,8 @@ private:
             while (peer_group_start_offset < end_offset && 
current_row_rank_value <= limit)
             {
                 auto next_peer_group_start_offset = 
advanceNextPeerGroup(chunk, peer_group_start_offset, end_offset);
-
-                insertResultValue(chunk, peer_group_start_offset, 
next_peer_group_start_offset - peer_group_start_offset);
+                size_t group_rows = next_peer_group_start_offset - 
peer_group_start_offset;
+                insertResultValue(chunk, peer_group_start_offset, group_rows);
                 try_end_peer_group(peer_group_start_offset, 
next_peer_group_start_offset, end_offset, chunk_rows);
                 peer_group_start_offset = next_peer_group_start_offset;
             }
@@ -261,12 +252,8 @@ private:
         if (!rows)
             return;
         if (output_columns.empty())
-        {
             for (const auto & col : chunk.getColumns())
-            {
                 output_columns.push_back(col->cloneEmpty());
-            }
-        }
         size_t i = 0;
         for (const auto & col : chunk.getColumns())
         {
@@ -279,12 +266,8 @@ private:
         if (peer_group_start_row_columns.empty())
             peer_group_start_row_columns = extractOneRowColumns(chunk, 
start_offset);
         for (size_t i = start_offset; i < partition_end_offset; ++i)
-        {
             if (!isRowEqual(sort_columns, peer_group_start_row_columns, 0, 
chunk.getColumns(), i))
-            {
                 return i;
-            }
-        }
         return partition_end_offset;
     }
 };
diff --git a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp 
b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp
index f2cc84082d..49bbe02c55 100644
--- a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp
+++ b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.cpp
@@ -164,6 +164,7 @@ WindowGroupOptimizationInfo 
WindowGroupOptimizationInfo::parse(const String & ad
     auto kkvs = convertToKVs(advance);
     auto & kvs = kkvs["WindowGroupLimitParameters"];
     tryAssign(kvs, "window_function", info.window_function);
+    tryAssign(kvs, "is_aggregate_group_limit", info.is_aggregate_group_limit);
     return info;
 }
 }
diff --git a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h 
b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h
index 3028639bb3..795577328f 100644
--- a/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h
+++ b/cpp-ch/local-engine/Parser/AdvancedParametersParseUtil.h
@@ -49,6 +49,7 @@ struct AggregateOptimizationInfo
 struct WindowGroupOptimizationInfo
 {
     String window_function;
+    bool is_aggregate_group_limit = false;
     static WindowGroupOptimizationInfo parse(const String & advnace);
 };
 }
diff --git a/cpp-ch/local-engine/Parser/RelParsers/GroupLimitRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/GroupLimitRelParser.cpp
new file mode 100644
index 0000000000..06f68e8ae2
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/GroupLimitRelParser.cpp
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GroupLimitRelParser.h"
+#include <memory>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <Columns/IColumn.h>
+#include <Core/ColumnsWithTypeAndName.h>
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeTuple.h>
+#include <DataTypes/IDataType.h>
+#include <DataTypes/Serializations/ISerialization.h>
+#include <IO/WriteBufferFromString.h>
+#include <Interpreters/ActionsDAG.h>
+#include <Interpreters/AggregateDescription.h>
+#include <Interpreters/WindowDescription.h>
+#include <Operator/BranchStep.h>
+#include <Operator/GraceMergingAggregatedStep.h>
+#include <Operator/WindowGroupLimitStep.h>
+#include <Parser/AdvancedParametersParseUtil.h>
+#include <Parser/RelParsers/SortRelParser.h>
+#include <Processors/IProcessor.h>
+#include <Processors/Port.h>
+#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
+#include <Processors/QueryPlan/ExpressionStep.h>
+#include <Processors/QueryPlan/FilterStep.h>
+#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
+#include <Processors/QueryPlan/QueryPlan.h>
+#include <Processors/QueryPlan/ReadFromPreparedSource.h>
+#include <Processors/QueryPlan/SortingStep.h>
+#include <Processors/QueryPlan/WindowStep.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <QueryPipeline/QueryPlanResourceHolder.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/wrappers.pb.h>
+#include <Common/AggregateUtil.h>
+#include <Common/ArrayJoinHelper.h>
+#include <Common/CHUtil.h>
+#include <Common/GlutenConfig.h>
+#include <Common/QueryContext.h>
+#include <Common/SortUtils.h>
+#include <Common/logger_useful.h>
+
+namespace DB::ErrorCodes
+{
+extern const int BAD_ARGUMENTS;
+}
+
+namespace DB
+{
+namespace Setting
+{
+extern const SettingsMaxThreads max_threads;
+
+}
+}
+
+namespace local_engine
+{
+GroupLimitRelParser::GroupLimitRelParser(ParserContextPtr parser_context_) : 
RelParser(parser_context_)
+{
+}
+
+DB::QueryPlanPtr
+GroupLimitRelParser::parse(DB::QueryPlanPtr current_plan_, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
+{
+    const auto win_rel_def = rel.windowgrouplimit();
+    google::protobuf::StringValue optimize_info_str;
+    
optimize_info_str.ParseFromString(win_rel_def.advanced_extension().optimization().value());
+    auto optimization_info = 
WindowGroupOptimizationInfo::parse(optimize_info_str.value());
+    if (optimization_info.is_aggregate_group_limit)
+    {
+        AggregateGroupLimitRelParser 
aggregate_group_limit_parser(parser_context);
+        auto plan = 
aggregate_group_limit_parser.parse(std::move(current_plan_), rel, rel_stack_);
+        steps = aggregate_group_limit_parser.getSteps();
+        return std::move(plan);
+    }
+    else
+    {
+        WindowGroupLimitRelParser window_parser(parser_context);
+        auto plan = window_parser.parse(std::move(current_plan_), rel, 
rel_stack_);
+        steps = window_parser.getSteps();
+        return std::move(plan);
+    }
+}
+
+static std::vector<size_t> parsePartitionFields(const 
google::protobuf::RepeatedPtrField<substrait::Expression> & expressions)
+{
+    std::vector<size_t> fields;
+    for (const auto & expr : expressions)
+        if (expr.has_selection())
+            
fields.push_back(static_cast<size_t>(expr.selection().direct_reference().struct_field().field()));
+        else if (expr.has_literal())
+            continue;
+        else
+            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow 
expression: {}", expr.DebugString());
+    return fields;
+}
+
+std::vector<size_t> parseSortFields(const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields)
+{
+    std::vector<size_t> fields;
+    for (const auto sort_field : sort_fields)
+        if (sort_field.expr().has_literal())
+            continue;
+        else if (sort_field.expr().has_selection())
+            
fields.push_back(static_cast<size_t>(sort_field.expr().selection().direct_reference().struct_field().field()));
+        else
+            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown 
expression: {}", sort_field.expr().DebugString());
+    return fields;
+}
+
+
+WindowGroupLimitRelParser::WindowGroupLimitRelParser(ParserContextPtr 
parser_context_) : RelParser(parser_context_)
+{
+}
+
+DB::QueryPlanPtr
+WindowGroupLimitRelParser::parse(DB::QueryPlanPtr current_plan_, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
+{
+    const auto win_rel_def = rel.windowgrouplimit();
+    google::protobuf::StringValue optimize_info_str;
+    
optimize_info_str.ParseFromString(win_rel_def.advanced_extension().optimization().value());
+    auto optimization_info = 
WindowGroupOptimizationInfo::parse(optimize_info_str.value());
+    window_function_name = optimization_info.window_function;
+
+    current_plan = std::move(current_plan_);
+
+    auto partition_fields = 
parsePartitionFields(win_rel_def.partition_expressions());
+    auto sort_fields = parseSortFields(win_rel_def.sorts());
+    size_t limit = static_cast<size_t>(win_rel_def.limit());
+
+    auto window_group_limit_step = std::make_unique<WindowGroupLimitStep>(
+        current_plan->getCurrentHeader(), window_function_name, 
partition_fields, sort_fields, limit);
+    window_group_limit_step->setStepDescription("Window group limit");
+    steps.emplace_back(window_group_limit_step.get());
+    current_plan->addStep(std::move(window_group_limit_step));
+
+    return std::move(current_plan);
+}
+
+AggregateGroupLimitRelParser::AggregateGroupLimitRelParser(ParserContextPtr 
parser_context_) : RelParser(parser_context_)
+{
+}
+
+// used to decide which branch
+size_t selectBranchOnPartitionKeysCardinality(
+    const std::vector<size_t> & partition_keys, double high_card_threshold, 
const std::list<DB::Chunk> & chunks)
+{
+    size_t total_rows = 0;
+    std::unordered_set<UInt32> ids;
+    for (const auto & chunk : chunks)
+    {
+        total_rows += chunk.getNumRows();
+        DB::WeakHash32 hash(chunk.getNumRows());
+        const auto & cols = chunk.getColumns();
+        for (auto i : partition_keys)
+            hash.update(cols[i]->getWeakHash32());
+        const auto & data = hash.getData();
+        for (size_t n = 0, sz = chunk.getNumRows(); n < sz; ++n)
+            ids.insert(data[n]);
+    }
+    LOG_DEBUG(
+        getLogger("AggregateGroupLimitRelParser"),
+        "Approximate distinct keys {}, total rows: {}, thrshold: {}",
+        ids.size(),
+        total_rows,
+        high_card_threshold);
+    return ids.size() * 1.0 / (total_rows + 1) <= high_card_threshold ? 0 : 1;
+}
+
+DB::QueryPlanPtr AggregateGroupLimitRelParser::parse(
+    DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_)
+{
+    // calculate window's topk by aggregation.
+    // 1. add a pre-projecttion. Make two tuple arguments for the aggregation 
function. One is the required columns for the output, the other
+    //   is the required columns for sorting.
+    // 2. Collect the sorting directions for each sorting field, Let them as 
the aggregation function's parameters.
+    // 3. Add a aggregation step.
+    // 4. Add a post-projecttion. Explode the aggregation function's result, 
since the result is an array.
+
+    current_plan = std::move(current_plan_);
+    input_header = current_plan->getCurrentHeader();
+    win_rel_def = &rel.windowgrouplimit();
+
+    google::protobuf::StringValue optimize_info_str;
+    
optimize_info_str.ParseFromString(win_rel_def->advanced_extension().optimization().value());
+    auto optimization_info = 
WindowGroupOptimizationInfo::parse(optimize_info_str.value());
+    limit = static_cast<size_t>(win_rel_def->limit());
+    aggregate_function_name = 
getAggregateFunctionName(optimization_info.window_function);
+
+    if (limit < 1)
+        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid limit: 
{}", limit);
+
+    auto win_config = WindowConfig::loadFromContext(getContext());
+    auto high_card_threshold = 
win_config.aggregate_topk_high_cardinality_threshold;
+
+    // Aggregation doesn't perform well on high cardinality keys. We make two 
execution pathes here.
+    // - if the partition keys are low cardinality, run it by aggregation
+    // - if the partition keys are high cardinality, run it by window.
+    auto partition_fields = 
parsePartitionFields(win_rel_def->partition_expressions());
+    auto branch_in_header = current_plan->getCurrentHeader();
+    auto branch_step = std::make_unique<StaticBranchStep>(
+        getContext(),
+        branch_in_header,
+        2,
+        win_config.aggregate_topk_sample_rows,
+        [partition_fields, high_card_threshold](const std::list<DB::Chunk> & 
chunks) -> size_t
+        { return selectBranchOnPartitionKeysCardinality(partition_fields, 
high_card_threshold, chunks); });
+    branch_step->setStepDescription("Window TopK");
+    steps.push_back(branch_step.get());
+    current_plan->addStep(std::move(branch_step));
+
+    // If all partition keys are low cardinality keys, use aggregattion to get 
topk of each partition
+    auto aggregation_plan = BranchStepHelper::createSubPlan(branch_in_header, 
1);
+    prePrejectionForAggregateArguments(*aggregation_plan);
+    addGroupLmitAggregationStep(*aggregation_plan);
+    postProjectionForExplodingArrays(*aggregation_plan);
+    LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Aggregate topk 
plan:\n{}", PlanUtil::explainPlan(*aggregation_plan));
+
+    auto window_plan = BranchStepHelper::createSubPlan(branch_in_header, 1);
+    addSortStep(*window_plan);
+    addWindowLimitStep(*window_plan);
+    auto convert_actions_dag = DB::ActionsDAG::makeConvertingActions(
+        window_plan->getCurrentHeader().getColumnsWithTypeAndName(),
+        aggregation_plan->getCurrentHeader().getColumnsWithTypeAndName(),
+        DB::ActionsDAG::MatchColumnsMode::Position);
+    auto convert_step = 
std::make_unique<DB::ExpressionStep>(window_plan->getCurrentHeader(), 
std::move(convert_actions_dag));
+    convert_step->setStepDescription("Rename rank column name");
+    window_plan->addStep(std::move(convert_step));
+    LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Window topk 
plan:\n{}", PlanUtil::explainPlan(*window_plan));
+
+    std::vector<DB::QueryPlanPtr> branch_plans;
+    branch_plans.emplace_back(std::move(aggregation_plan));
+    branch_plans.emplace_back(std::move(window_plan));
+    auto unite_branches_step = 
std::make_unique<UniteBranchesStep>(branch_in_header, std::move(branch_plans), 
1);
+    unite_branches_step->setStepDescription("Unite TopK branches");
+    steps.push_back(unite_branches_step.get());
+
+    current_plan->addStep(std::move(unite_branches_step));
+    return std::move(current_plan);
+}
+
+String AggregateGroupLimitRelParser::getAggregateFunctionName(const String & 
window_function_name)
+{
+    if (window_function_name == "row_number")
+        return "rowNumGroupArraySorted";
+    else
+        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported window 
function: {}", window_function_name);
+}
+
+// Build one tuple column as the aggregate function's arguments
+void 
AggregateGroupLimitRelParser::prePrejectionForAggregateArguments(DB::QueryPlan 
& plan)
+{
+    auto projection_actions = 
std::make_shared<DB::ActionsDAG>(input_header.getColumnsWithTypeAndName());
+
+
+    auto partition_fields = 
parsePartitionFields(win_rel_def->partition_expressions());
+    std::set<size_t> unique_partition_fields(partition_fields.begin(), 
partition_fields.end());
+    DB::NameSet required_column_names;
+    auto build_tuple = [&](const DB::DataTypes & data_types,
+                           const Strings & names,
+                           const DB::ActionsDAG::NodeRawConstPtrs & elements,
+                           const String & name_prefix,
+                           String & result_name)
+    {
+        result_name = expression_parser->getUniqueName(name_prefix);
+        auto tuple = expression_parser->toFunctionNode(*projection_actions, 
"tuple", elements, result_name);
+        auto tuple_type = std::make_shared<DB::DataTypeTuple>(data_types, 
names);
+        DB::ActionsDAG::NodeRawConstPtrs cast_args;
+        cast_args.push_back(tuple);
+        cast_args.push_back(
+            expression_parser->addConstColumn(*projection_actions, 
std::make_shared<DataTypeString>(), tuple_type->getName()));
+        tuple = expression_parser->toFunctionNode(*projection_actions, "CAST", 
cast_args, result_name);
+        projection_actions->addOrReplaceInOutputs(*tuple);
+        required_column_names.insert(tuple->result_name);
+    };
+
+    DB::DataTypes aggregate_data_tuple_types;
+    Strings aggregate_data_tuple_names;
+    DB::ActionsDAG::NodeRawConstPtrs aggregate_data_tuple_nodes;
+    for (size_t i = 0; i < input_header.columns(); ++i)
+    {
+        const auto & col = input_header.getByPosition(i);
+        if (unique_partition_fields.count(i))
+        {
+            required_column_names.insert(col.name);
+            aggregate_grouping_keys.push_back(col.name);
+        }
+        else
+        {
+            aggregate_data_tuple_types.push_back(col.type);
+            aggregate_data_tuple_names.push_back(col.name);
+            
aggregate_data_tuple_nodes.push_back(projection_actions->getInputs()[i]);
+        }
+    }
+    build_tuple(
+        aggregate_data_tuple_types,
+        aggregate_data_tuple_names,
+        aggregate_data_tuple_nodes,
+        "aggregate_data_tuple",
+        aggregate_tuple_column_name);
+
+    projection_actions->removeUnusedActions(required_column_names);
+    LOG_DEBUG(
+        getLogger("AggregateGroupLimitRelParser"),
+        "Projection for building tuples for aggregate function:\n{}",
+        projection_actions->dumpDAG());
+
+    auto expression_step = std::make_unique<DB::ExpressionStep>(input_header, 
std::move(*projection_actions));
+    expression_step->setStepDescription("Pre-projection for aggregate group 
limit arguments");
+    plan.addStep(std::move(expression_step));
+}
+
+DB::AggregateDescription 
AggregateGroupLimitRelParser::buildAggregateDescription(DB::QueryPlan & plan)
+{
+    DB::AggregateDescription agg_desc;
+    agg_desc.column_name = aggregate_tuple_column_name;
+    agg_desc.argument_names = {aggregate_tuple_column_name};
+    DB::Array parameters;
+    parameters.push_back(static_cast<UInt32>(limit));
+    auto sort_directions = buildSQLLikeSortDescription(input_header, 
win_rel_def->sorts());
+    parameters.push_back(sort_directions);
+
+    auto header = plan.getCurrentHeader();
+    DB::DataTypes arg_types;
+    arg_types.push_back(header.getByName(aggregate_tuple_column_name).type);
+
+    DB::AggregateFunctionProperties properties;
+    agg_desc.function = getAggregateFunction(aggregate_function_name, 
arg_types, properties, parameters);
+    return agg_desc;
+}
+
+void AggregateGroupLimitRelParser::addGroupLmitAggregationStep(DB::QueryPlan & 
plan)
+{
+    const auto & settings = getContext()->getSettingsRef();
+    DB::AggregateDescriptions agg_descs = {buildAggregateDescription(plan)};
+    auto params = AggregatorParamsHelper::buildParams(
+        getContext(), aggregate_grouping_keys, agg_descs, 
AggregatorParamsHelper::Mode::INIT_TO_COMPLETED);
+    auto agg_step = std::make_unique<GraceMergingAggregatedStep>(getContext(), 
plan.getCurrentHeader(), params, true);
+    plan.addStep(std::move(agg_step));
+    LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Plan after add group 
limit:\n{}", PlanUtil::explainPlan(plan));
+}
+
+void 
AggregateGroupLimitRelParser::postProjectionForExplodingArrays(DB::QueryPlan & 
plan)
+{
+    auto header = plan.getCurrentHeader();
+
+    /// flatten the array column.
+    auto agg_result_index = header.columns() - 1;
+    auto array_join_actions_dag = 
ArrayJoinHelper::applyArrayJoinOnOneColumn(header, agg_result_index);
+    auto new_steps = ArrayJoinHelper::addArrayJoinStep(getContext(), plan, 
array_join_actions_dag, false);
+
+    auto array_join_output_header = plan.getCurrentHeader();
+    DB::ActionsDAG 
flatten_actions_dag(array_join_output_header.getColumnsWithTypeAndName());
+    DB::Names flatten_output_column_names;
+    for (size_t i = 0; i < array_join_output_header.columns() - 1; ++i)
+    {
+        const auto & col = array_join_output_header.getByPosition(i);
+        flatten_output_column_names.push_back(col.name);
+    }
+    auto last_column = 
array_join_output_header.getByPosition(array_join_output_header.columns() - 1);
+    const auto * tuple_column = typeid_cast<const DB::ColumnTuple 
*>(last_column.column.get());
+    const auto * tuple_datatype = typeid_cast<const DB::DataTypeTuple 
*>(last_column.type.get());
+    const auto & field_names = tuple_datatype->getElementNames();
+    DB::DataTypePtr tuple_index_type = std::make_shared<DB::DataTypeUInt32>();
+    const auto * tuple_node = flatten_actions_dag.getInputs().back();
+    for (size_t i = 0; i < field_names.size(); ++i)
+    {
+        DB::ActionsDAG::NodeRawConstPtrs tuple_index_args;
+        tuple_index_args.push_back(tuple_node);
+        
tuple_index_args.push_back(expression_parser->addConstColumn(flatten_actions_dag,
 tuple_index_type, i + 1));
+        const auto * field_node = 
expression_parser->toFunctionNode(flatten_actions_dag, "tupleElement", 
tuple_index_args, field_names[i]);
+        flatten_actions_dag.addOrReplaceInOutputs(*field_node);
+        flatten_output_column_names.push_back(field_node->result_name);
+    }
+    flatten_actions_dag.removeUnusedActions(flatten_output_column_names);
+    LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Actions dag for 
untupling aggregate result:\n{}", flatten_actions_dag.dumpDAG());
+    auto flatten_expression_step = 
std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), 
std::move(flatten_actions_dag));
+    flatten_expression_step->setStepDescription("Untuple the aggregation 
result");
+    plan.addStep(std::move(flatten_expression_step));
+
+    auto flatten_tuple_output_header = plan.getCurrentHeader();
+    auto window_result_column = 
flatten_tuple_output_header.getByPosition(flatten_tuple_output_header.columns() 
- 1);
+    /// The result column is put at the end of the header.
+    auto output_header = input_header;
+    output_header.insert(window_result_column);
+    auto adjust_pos_actions_dag = DB::ActionsDAG::makeConvertingActions(
+        flatten_tuple_output_header.getColumnsWithTypeAndName(),
+        output_header.getColumnsWithTypeAndName(),
+        DB::ActionsDAG::MatchColumnsMode::Name);
+    LOG_DEBUG(getLogger("AggregateGroupLimitRelParser"), "Actions dag for 
replacing columns:\n{}", adjust_pos_actions_dag.dumpDAG());
+    auto adjust_pos_expression_step = 
std::make_unique<DB::ExpressionStep>(flatten_tuple_output_header, 
std::move(adjust_pos_actions_dag));
+    adjust_pos_expression_step->setStepDescription("Adjust position of the 
output columns");
+    plan.addStep(std::move(adjust_pos_expression_step));
+}
+
+void AggregateGroupLimitRelParser::addSortStep(DB::QueryPlan & plan)
+{
+    auto header = plan.getCurrentHeader();
+    DB::SortDescription full_sort_descr;
+    auto partition_fields = 
parsePartitionFields(win_rel_def->partition_expressions());
+    for (auto field : partition_fields)
+    {
+        const auto & col = header.getByPosition(field);
+        full_sort_descr.emplace_back(col.name, 1, -1);
+    }
+    auto sort_desrc = 
SortRelParser::parseSortDescription(win_rel_def->sorts(), header);
+    full_sort_descr.insert(full_sort_descr.end(), sort_desrc.begin(), 
sort_desrc.end());
+
+    DB::SortingStep::Settings settings(*getContext());
+    auto config = MemoryConfig::loadFromContext(getContext());
+    double spill_mem_ratio = config.spill_mem_ratio;
+    settings.worth_external_sort = [spill_mem_ratio]() -> bool { return 
currentThreadGroupMemoryUsageRatio() > spill_mem_ratio; };
+    auto sorting_step = 
std::make_unique<DB::SortingStep>(plan.getCurrentHeader(), full_sort_descr, 0, 
settings);
+    sorting_step->setStepDescription("Sorting step");
+    plan.addStep(std::move(sorting_step));
+}
+
+static DB::WindowFrame buildWindowFrame(const std::string & ch_function_name)
+{
+    DB::WindowFrame frame;
+    // default window frame is [unbounded preceding, current row]
+    if (ch_function_name == "row_number")
+    {
+        frame.type = DB::WindowFrame::FrameType::ROWS;
+        frame.begin_type = DB::WindowFrame::BoundaryType::Offset;
+        frame.begin_offset = 1;
+    }
+    else
+        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow window 
function: {}", ch_function_name);
+    return frame;
+}
+
+static DB::WindowFunctionDescription buildWindowFunctionDescription(const 
std::string & ch_function_name)
+{
+    DB::WindowFunctionDescription description;
+    if (ch_function_name == "row_number")
+    {
+        description.column_name = ch_function_name;
+        description.function_node = nullptr;
+        DB::AggregateFunctionProperties agg_props;
+        auto agg_func = RelParser::getAggregateFunction(ch_function_name, {}, 
agg_props, {});
+        description.aggregate_function = agg_func;
+    }
+    else
+        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow window 
function: {}", ch_function_name);
+    return description;
+}
+
+
+// TODO: WindowGroupLimitStep has bad performance, need to improve it. So we 
still use window + filter here.
+void AggregateGroupLimitRelParser::addWindowLimitStep(DB::QueryPlan & plan)
+{
+    google::protobuf::StringValue optimize_info_str;
+    
optimize_info_str.ParseFromString(win_rel_def->advanced_extension().optimization().value());
+    auto optimization_info = 
WindowGroupOptimizationInfo::parse(optimize_info_str.value());
+    auto window_function_name = optimization_info.window_function;
+
+    auto in_header = plan.getCurrentHeader();
+    DB::WindowDescription win_descr;
+    win_descr.frame = buildWindowFrame(window_function_name);
+    win_descr.partition_by = parseSortFields(in_header, 
win_rel_def->partition_expressions());
+    win_descr.order_by = parseSortFields(in_header, win_rel_def->sorts());
+    win_descr.full_sort_description = win_descr.partition_by;
+    
win_descr.full_sort_description.insert(win_descr.full_sort_description.end(), 
win_descr.order_by.begin(), win_descr.order_by.end());
+    DB::WriteBufferFromOwnString ss;
+    ss << "partition by " << DB::dumpSortDescription(win_descr.partition_by);
+    ss << " order by " << DB::dumpSortDescription(win_descr.order_by);
+    ss << " " << win_descr.frame.toString();
+    win_descr.window_name = ss.str();
+
+    auto win_func_description = 
buildWindowFunctionDescription(window_function_name);
+    win_descr.window_functions.push_back(win_func_description);
+
+    auto win_step = std::make_unique<WindowStep>(in_header, win_descr, 
win_descr.window_functions, false);
+    win_step->setStepDescription("Window (" + win_descr.window_name + ")");
+    plan.addStep(std::move(win_step));
+
+    auto win_result_header = plan.getCurrentHeader();
+    DB::ActionsDAG 
limit_actions_dag(win_result_header.getColumnsWithTypeAndName());
+    const auto * rank_value_node = limit_actions_dag.getInputs().back();
+    const auto * limit_value_node = 
expression_parser->addConstColumn(limit_actions_dag, 
std::make_shared<DB::DataTypeInt32>(), limit);
+    const auto * cmp_node = 
expression_parser->toFunctionNode(limit_actions_dag, "lessOrEquals", 
{rank_value_node, limit_value_node});
+    auto cmp_column_name = cmp_node->result_name;
+    limit_actions_dag.addOrReplaceInOutputs(*cmp_node);
+    auto filter_step = std::make_unique<DB::FilterStep>(win_result_header, 
std::move(limit_actions_dag), cmp_column_name, true);
+    plan.addStep(std::move(filter_step));
+}
+
+void registerWindowGroupLimitRelParser(RelParserFactory & factory)
+{
+    auto builder = [](ParserContextPtr parser_context) { return 
std::make_shared<GroupLimitRelParser>(parser_context); };
+    factory.registerBuilder(substrait::Rel::RelTypeCase::kWindowGroupLimit, 
builder);
+}
+}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/GroupLimitRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/GroupLimitRelParser.h
new file mode 100644
index 0000000000..b9f3aa6631
--- /dev/null
+++ b/cpp-ch/local-engine/Parser/RelParsers/GroupLimitRelParser.h
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <optional>
+#include <Parser/RelParsers/RelParser.h>
+#include <Processors/QueryPlan/QueryPlan.h>
+#include <Poco/Logger.h>
+#include <Common/logger_useful.h>
+#include "Analyzer/IQueryTreeNode.h"
+#include "substrait/algebra.pb.h"
+
+namespace local_engine
+{
+
+class GroupLimitRelParser : public RelParser
+{
+public:
+    explicit GroupLimitRelParser(ParserContextPtr parser_context_);
+    ~GroupLimitRelParser() override = default;
+    DB::QueryPlanPtr
+    parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.windowgrouplimit().input(); }
+};
+
+/// Similar to WindowRelParser. Some differences
+/// 1. cannot support aggregate functions. only support window functions: 
row_number, rank, dense_rank
+/// 2. row_number, rank and dense_rank are mapped to new variants
+/// 3. the output columns don't contain window function results
+class WindowGroupLimitRelParser : public RelParser
+{
+public:
+    explicit WindowGroupLimitRelParser(ParserContextPtr parser_context_);
+    ~WindowGroupLimitRelParser() override = default;
+    DB::QueryPlanPtr
+    parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.windowgrouplimit().input(); }
+
+private:
+    DB::QueryPlanPtr current_plan;
+    String window_function_name;
+};
+
+class AggregateGroupLimitRelParser : public RelParser
+{
+public:
+    explicit AggregateGroupLimitRelParser(ParserContextPtr parser_context_);
+    ~AggregateGroupLimitRelParser() override = default;
+    DB::QueryPlanPtr
+    parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
+    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.windowgrouplimit().input(); }
+
+private:
+    DB::QueryPlanPtr current_plan;
+    const substrait::WindowGroupLimitRel * win_rel_def;
+    String aggregate_function_name;
+    size_t limit = 0;
+    DB::Block input_header;
+    // DB::Block output_header;
+    DB::Names aggregate_grouping_keys;
+    String aggregate_tuple_column_name;
+
+    String getAggregateFunctionName(const String & window_function_name);
+
+    void prePrejectionForAggregateArguments(DB::QueryPlan & plan);
+
+    void addGroupLmitAggregationStep(DB::QueryPlan & plan);
+    String parseSortDirections(const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields);
+    DB::AggregateDescription buildAggregateDescription(DB::QueryPlan & plan);
+    void postProjectionForExplodingArrays(DB::QueryPlan & plan);
+
+    void addSortStep(DB::QueryPlan & plan);
+    void addWindowLimitStep(DB::QueryPlan & plan);
+};
+}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp
index 20af6f83fc..21e2dba44f 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.cpp
@@ -22,6 +22,7 @@
 #include <Processors/QueryPlan/ArrayJoinStep.h>
 #include <Processors/QueryPlan/ExpressionStep.h>
 #include <Rewriter/ExpressionRewriter.h>
+#include <Common/ArrayJoinHelper.h>
 
 namespace DB
 {
@@ -85,40 +86,6 @@ ProjectRelParser::parseProject(DB::QueryPlanPtr query_plan, 
const substrait::Rel
     }
 }
 
-const DB::ActionsDAG::Node * ProjectRelParser::findArrayJoinNode(const 
ActionsDAG & actions_dag)
-{
-    const ActionsDAG::Node * array_join_node = nullptr;
-    const auto & nodes = actions_dag.getNodes();
-    for (const auto & node : nodes)
-    {
-        if (node.type == ActionsDAG::ActionType::ARRAY_JOIN)
-        {
-            if (array_join_node)
-                throw Exception(ErrorCodes::LOGICAL_ERROR, "Expect single 
ARRAY JOIN node in generate rel");
-            array_join_node = &node;
-        }
-    }
-    return array_join_node;
-}
-
-ProjectRelParser::SplittedActionsDAGs 
ProjectRelParser::splitActionsDAGInGenerate(const ActionsDAG & actions_dag)
-{
-    SplittedActionsDAGs res;
-
-    auto array_join_node = findArrayJoinNode(actions_dag);
-    std::unordered_set<const ActionsDAG::Node *> 
first_split_nodes(array_join_node->children.begin(), 
array_join_node->children.end());
-    auto first_split_result = actions_dag.split(first_split_nodes);
-    res.before_array_join = std::move(first_split_result.first);
-
-    array_join_node = findArrayJoinNode(first_split_result.second);
-    std::unordered_set<const ActionsDAG::Node *> second_split_nodes = 
{array_join_node};
-    auto second_split_result = 
first_split_result.second.split(second_split_nodes);
-    res.array_join = std::move(second_split_result.first);
-    second_split_result.second.removeUnusedActions();
-    res.after_array_join = std::move(second_split_result.second);
-    return res;
-}
-
 bool ProjectRelParser::isReplicateRows(substrait::GenerateRel rel)
 {
     auto signature = 
expression_parser->getFunctionNameInSignature(rel.generator().scalar_function());
@@ -164,7 +131,7 @@ ProjectRelParser::parseGenerate(DB::QueryPlanPtr 
query_plan, const substrait::Re
     auto header = query_plan->getCurrentHeader();
     auto actions_dag = expressionsToActionsDAG(expressions, header);
 
-    if (!findArrayJoinNode(actions_dag))
+    if (!ArrayJoinHelper::findArrayJoinNode(actions_dag))
     {
         /// If generator in generate rel is not explode/posexplode, e.g. 
json_tuple
         auto expression_step = 
std::make_unique<ExpressionStep>(query_plan->getCurrentHeader(), 
std::move(actions_dag));
@@ -174,59 +141,8 @@ ProjectRelParser::parseGenerate(DB::QueryPlanPtr 
query_plan, const substrait::Re
     }
     else
     {
-        /// If generator in generate rel is explode/posexplode, transform 
arrayJoin function to ARRAY JOIN STEP to apply max_block_size
-        /// which avoids OOM when several lateral view explode/posexplode is 
used in spark sqls
-        LOG_DEBUG(logger, "original actions_dag:{}", actions_dag.dumpDAG());
-        auto splitted_actions_dags = splitActionsDAGInGenerate(actions_dag);
-        LOG_DEBUG(logger, "actions_dag before arrayJoin:{}", 
splitted_actions_dags.before_array_join.dumpDAG());
-        LOG_DEBUG(logger, "actions_dag during arrayJoin:{}", 
splitted_actions_dags.array_join.dumpDAG());
-        LOG_DEBUG(logger, "actions_dag after arrayJoin:{}", 
splitted_actions_dags.after_array_join.dumpDAG());
-
-        auto ignore_actions_dag = [](const ActionsDAG & actions_dag_) -> bool
-        {
-            /*
-            We should ignore actions_dag like:
-            0 : INPUT () (no column) String a
-            1 : INPUT () (no column) String b
-            Output nodes: 0, 1
-             */
-            return actions_dag_.getOutputs().size() == 
actions_dag_.getNodes().size()
-                && actions_dag_.getInputs().size() == 
actions_dag_.getNodes().size();
-        };
-
-        /// Pre-projection before array join
-        if (!ignore_actions_dag(splitted_actions_dags.before_array_join))
-        {
-            auto step_before_array_join
-                = 
std::make_unique<ExpressionStep>(query_plan->getCurrentHeader(), 
std::move(splitted_actions_dags.before_array_join));
-            step_before_array_join->setStepDescription("Pre-projection In 
Generate");
-            steps.emplace_back(step_before_array_join.get());
-            query_plan->addStep(std::move(step_before_array_join));
-            // LOG_DEBUG(logger, "plan1:{}", 
PlanUtil::explainPlan(*query_plan));
-        }
-
-        /// ARRAY JOIN
-        Names 
array_joined_columns{findArrayJoinNode(splitted_actions_dags.array_join)->result_name};
-        ArrayJoin array_join;
-        array_join.columns = std::move(array_joined_columns);
-        array_join.is_left = generate_rel.outer();
-        auto array_join_step = std::make_unique<ArrayJoinStep>(
-            query_plan->getCurrentHeader(), std::move(array_join), false, 
getContext()->getSettingsRef()[Setting::max_block_size]);
-        array_join_step->setStepDescription("ARRAY JOIN In Generate");
-        steps.emplace_back(array_join_step.get());
-        query_plan->addStep(std::move(array_join_step));
-        // LOG_DEBUG(logger, "plan2:{}", PlanUtil::explainPlan(*query_plan));
-
-        /// Post-projection after array join(Optional)
-        if (!ignore_actions_dag(splitted_actions_dags.after_array_join))
-        {
-            auto step_after_array_join
-                = 
std::make_unique<ExpressionStep>(query_plan->getCurrentHeader(), 
std::move(splitted_actions_dags.after_array_join));
-            step_after_array_join->setStepDescription("Post-projection In 
Generate");
-            steps.emplace_back(step_after_array_join.get());
-            query_plan->addStep(std::move(step_after_array_join));
-            // LOG_DEBUG(logger, "plan3:{}", 
PlanUtil::explainPlan(*query_plan));
-        }
+        auto new_steps = ArrayJoinHelper::addArrayJoinStep(getContext(), 
*query_plan, actions_dag, generate_rel.outer());
+        steps.insert(steps.end(), new_steps.begin(), new_steps.end());
     }
 
     return query_plan;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h
index ce7a6faa70..cb6cf84f72 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/ProjectRelParser.h
@@ -26,13 +26,6 @@ namespace local_engine
 class ProjectRelParser : public RelParser
 {
 public:
-    struct SplittedActionsDAGs
-    {
-        ActionsDAG before_array_join; /// Optional
-        ActionsDAG array_join;
-        ActionsDAG after_array_join; /// Optional
-    };
-
     explicit ProjectRelParser(ParserContextPtr parser_context_);
     ~ProjectRelParser() override = default;
 
@@ -45,11 +38,6 @@ private:
     DB::QueryPlanPtr parseProject(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_);
     DB::QueryPlanPtr parseGenerate(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_);
 
-    static const DB::ActionsDAG::Node * findArrayJoinNode(const ActionsDAG & 
actions_dag);
-
-    /// Split actions_dag of generate rel into 3 parts: before array join + 
during array join + after array join
-    static SplittedActionsDAGs splitActionsDAGInGenerate(const ActionsDAG & 
actions_dag);
-
     bool isReplicateRows(substrait::GenerateRel rel);
 
     DB::QueryPlanPtr parseReplicateRows(QueryPlanPtr query_plan, 
substrait::GenerateRel generate_rel);
diff --git 
a/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp
deleted file mode 100644
index e82d68c1d1..0000000000
--- a/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "WindowGroupLimitRelParser.h"
-#include <Operator/WindowGroupLimitStep.h>
-#include <Parser/AdvancedParametersParseUtil.h>
-#include <Processors/QueryPlan/ExpressionStep.h>
-#include <google/protobuf/repeated_field.h>
-#include <google/protobuf/wrappers.pb.h>
-
-namespace DB::ErrorCodes
-{
-extern const int BAD_ARGUMENTS;
-}
-
-namespace local_engine
-{
-WindowGroupLimitRelParser::WindowGroupLimitRelParser(ParserContextPtr 
parser_context_) : RelParser(parser_context_)
-{
-}
-
-DB::QueryPlanPtr
-WindowGroupLimitRelParser::parse(DB::QueryPlanPtr current_plan_, const 
substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack_)
-{
-    const auto win_rel_def = rel.windowgrouplimit();
-    google::protobuf::StringValue optimize_info_str;
-    
optimize_info_str.ParseFromString(win_rel_def.advanced_extension().optimization().value());
-    auto optimization_info = 
WindowGroupOptimizationInfo::parse(optimize_info_str.value());
-    window_function_name = optimization_info.window_function;
-
-    current_plan = std::move(current_plan_);
-
-    auto partition_fields = 
parsePartitoinFields(win_rel_def.partition_expressions());
-    auto sort_fields = parseSortFields(win_rel_def.sorts());
-    size_t limit = static_cast<size_t>(win_rel_def.limit());
-
-    auto window_group_limit_step = std::make_unique<WindowGroupLimitStep>(
-        current_plan->getCurrentHeader(), window_function_name, 
partition_fields, sort_fields, limit);
-    window_group_limit_step->setStepDescription("Window group limit");
-    steps.emplace_back(window_group_limit_step.get());
-    current_plan->addStep(std::move(window_group_limit_step));
-
-    return std::move(current_plan);
-}
-
-std::vector<size_t>
-WindowGroupLimitRelParser::parsePartitoinFields(const 
google::protobuf::RepeatedPtrField<substrait::Expression> & expressions)
-{
-    std::vector<size_t> fields;
-    for (const auto & expr : expressions)
-        if (expr.has_selection())
-            
fields.push_back(static_cast<size_t>(expr.selection().direct_reference().struct_field().field()));
-        else if (expr.has_literal())
-            continue;
-        else
-            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknow 
expression: {}", expr.DebugString());
-    return fields;
-}
-
-std::vector<size_t> WindowGroupLimitRelParser::parseSortFields(const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields)
-{
-    std::vector<size_t> fields;
-    for (const auto sort_field : sort_fields)
-        if (sort_field.expr().has_literal())
-            continue;
-        else if (sort_field.expr().has_selection())
-            
fields.push_back(static_cast<size_t>(sort_field.expr().selection().direct_reference().struct_field().field()));
-        else
-            throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unknown 
expression: {}", sort_field.expr().DebugString());
-    return fields;
-}
-
-void registerWindowGroupLimitRelParser(RelParserFactory & factory)
-{
-    auto builder = [](ParserContextPtr parser_context) { return 
std::make_shared<WindowGroupLimitRelParser>(parser_context); };
-    factory.registerBuilder(substrait::Rel::RelTypeCase::kWindowGroupLimit, 
builder);
-}
-}
diff --git a/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h
deleted file mode 100644
index 573b7fad7c..0000000000
--- a/cpp-ch/local-engine/Parser/RelParsers/WindowGroupLimitRelParser.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-#include <optional>
-#include <Parser/RelParsers/RelParser.h>
-#include <Processors/QueryPlan/QueryPlan.h>
-#include <Poco/Logger.h>
-#include <Common/logger_useful.h>
-
-namespace local_engine
-{
-/// Similar to WindowRelParser. Some differences
-/// 1. cannot support aggregate functions. only support window functions: 
row_number, rank, dense_rank
-/// 2. row_number, rank and dense_rank are mapped to new variants
-/// 3. the output columns don't contain window function results
-class WindowGroupLimitRelParser : public RelParser
-{
-public:
-    explicit WindowGroupLimitRelParser(ParserContextPtr parser_context_);
-    ~WindowGroupLimitRelParser() override = default;
-    DB::QueryPlanPtr
-    parse(DB::QueryPlanPtr current_plan_, const substrait::Rel & rel, 
std::list<const substrait::Rel *> & rel_stack_) override;
-    std::optional<const substrait::Rel *> getSingleInput(const substrait::Rel 
& rel) override { return &rel.windowgrouplimit().input(); }
-
-private:
-    DB::QueryPlanPtr current_plan;
-    String window_function_name;
-
-    std::vector<size_t> parsePartitoinFields(const 
google::protobuf::RepeatedPtrField<substrait::Expression> & expressions);
-    std::vector<size_t> parseSortFields(const 
google::protobuf::RepeatedPtrField<substrait::SortField> & sort_fields);
-};
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to