This is an automated email from the ASF dual-hosted git repository.

exmy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f13452955 [GLUTEN-9137][CH] Support CollectLimit for CH backend 
(#9139)
2f13452955 is described below

commit 2f13452955474ee2320b7d902d5df36bd552f18a
Author: exmy <[email protected]>
AuthorDate: Fri Apr 25 09:24:24 2025 +0800

    [GLUTEN-9137][CH] Support CollectLimit for CH backend (#9139)
    
    [GLUTEN-9137][CH] Support CollectLimit for CH backend
---
 .../apache/gluten/vectorized/CHNativeBlock.java    |  16 +++
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |  43 +++----
 .../clickhouse/CHSparkPlanExecApi.scala            |   2 +-
 .../execution/CHColumnarCollectLimitExec.scala     |  95 ++++++++++++++
 .../extension/BasicExpressionRewriteRule.scala     |   4 +-
 .../extension/CoalesceAggregationUnion.scala       |   4 +-
 .../CommonSubexpressionEliminateRule.scala         |   4 +-
 .../gluten/extension/ExtendedColumnPruning.scala   |   2 +-
 .../RewriteDateTimestampComparisonRule.scala       |   2 +-
 .../extension/RewriteToDateExpresstionRule.scala   |   4 +-
 .../GlutenClickHouseCollectLimitExecSuite.scala    | 139 +++++++++++++++++++++
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   2 -
 .../execution/ColumnarCollectLimitExec.scala       |  86 +------------
 cpp-ch/local-engine/local_engine_jni.cpp           |  13 +-
 .../gluten/backendsapi/BackendSettingsApi.scala    |   2 -
 .../execution/ColumnarCollectLimitBaseExec.scala   |  87 ++++++++++++-
 .../extension/columnar/validator/Validators.scala  |   1 -
 .../utils/clickhouse/ClickHouseTestSettings.scala  |   2 +
 .../utils/clickhouse/ClickHouseTestSettings.scala  |   4 +-
 19 files changed, 392 insertions(+), 120 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
index 8cf365335c..ffc8b70402 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
@@ -83,6 +83,12 @@ public class CHNativeBlock {
     return new CHNativeBlock(copyBlock(blockAddress)).toColumnarBatch();
   }
 
+  private native long nativeSlice(long blockAddress, int offset, int limit);
+
+  public long nativeSlice(int offset, int limit) {
+    return nativeSlice(blockAddress, offset, limit);
+  }
+
   public void close() {
     if (blockAddress != 0) {
       nativeClose(blockAddress);
@@ -111,4 +117,14 @@ public class CHNativeBlock {
     }
     return new ColumnarBatch(vectors, numRows);
   }
+
+  public static ColumnarBatch slice(ColumnarBatch batch, int offset, int 
limit) {
+    if (offset + limit > batch.numRows()) {
+      throw new GlutenException(
+          "Parameter out of bound in slice function, offset: " + offset + ", 
limit: " + limit);
+    }
+    CHNativeBlock block = CHNativeBlock.fromColumnarBatch(batch);
+    long blockAddress = block.nativeSlice(offset, limit);
+    return new CHNativeBlock(blockAddress).toColumnarBatch();
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 411351e9a4..8a4fde534f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -63,16 +63,16 @@ object CHRuleApi {
     injector.injectResolutionRule(spark => new 
JoinAggregateToAggregateUnion(spark))
     // CoalesceAggregationUnion and CoalesceProjectionUnion should follows
     // JoinAggregateToAggregateUnion
-    injector.injectResolutionRule(spark => new CoalesceAggregationUnion(spark))
-    injector.injectResolutionRule(spark => new CoalesceProjectionUnion(spark))
-    injector.injectResolutionRule(spark => new 
RewriteToDateExpresstionRule(spark))
-    injector.injectResolutionRule(spark => new 
RewriteDateTimestampComparisonRule(spark))
-    injector.injectResolutionRule(spark => new 
CollapseGetJsonObjectExpressionRule(spark))
-    injector.injectResolutionRule(spark => new 
RepalceFromJsonWithGetJsonObject(spark))
-    injector.injectOptimizerRule(spark => new 
CommonSubexpressionEliminateRule(spark))
-    injector.injectOptimizerRule(spark => new AggregateIfToFilterRule(spark))
-    injector.injectOptimizerRule(spark => new SimplifySumRule(spark))
-    injector.injectOptimizerRule(spark => new ExtendedColumnPruning(spark))
+    injector.injectResolutionRule(spark => CoalesceAggregationUnion(spark))
+    injector.injectResolutionRule(spark => CoalesceProjectionUnion(spark))
+    injector.injectResolutionRule(spark => RewriteToDateExpresstionRule(spark))
+    injector.injectResolutionRule(spark => 
RewriteDateTimestampComparisonRule(spark))
+    injector.injectResolutionRule(spark => 
CollapseGetJsonObjectExpressionRule(spark))
+    injector.injectResolutionRule(spark => 
RepalceFromJsonWithGetJsonObject(spark))
+    injector.injectOptimizerRule(spark => 
CommonSubexpressionEliminateRule(spark))
+    injector.injectOptimizerRule(spark => AggregateIfToFilterRule(spark))
+    injector.injectOptimizerRule(spark => SimplifySumRule(spark))
+    injector.injectOptimizerRule(spark => ExtendedColumnPruning(spark))
     injector.injectOptimizerRule(spark => 
CHAggregateFunctionRewriteRule(spark))
     injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
     injector.injectOptimizerRule(_ => EqualToRewrite)
@@ -83,11 +83,11 @@ object CHRuleApi {
     // Legacy: Pre-transform rules.
     injector.injectPreTransform(_ => RemoveTransitions)
     injector.injectPreTransform(_ => PushDownInputFileExpression.PreOffload)
-    injector.injectPreTransform(c => FallbackOnANSIMode.apply(c.session))
-    injector.injectPreTransform(c => FallbackMultiCodegens.apply(c.session))
+    injector.injectPreTransform(c => FallbackOnANSIMode(c.session))
+    injector.injectPreTransform(c => FallbackMultiCodegens(c.session))
     injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
-    injector.injectPreTransform(c => 
FallbackBroadcastHashJoin.apply(c.session))
-    injector.injectPreTransform(c => 
MergeTwoPhasesHashBaseAggregate.apply(c.session))
+    injector.injectPreTransform(c => FallbackBroadcastHashJoin(c.session))
+    injector.injectPreTransform(c => 
MergeTwoPhasesHashBaseAggregate(c.session))
     injector.injectPreTransform(_ => WriteFilesWithBucketValue)
 
     // Legacy: The legacy transform rule.
@@ -114,21 +114,22 @@ object CHRuleApi {
     injector.injectPostTransform(_ => EnsureLocalSortRequirements)
     injector.injectPostTransform(_ => EliminateLocalSort)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
-    injector.injectPostTransform(c => 
RewriteSortMergeJoinToHashJoinRule.apply(c.session))
-    injector.injectPostTransform(c => 
PushdownAggregatePreProjectionAheadExpand.apply(c.session))
-    injector.injectPostTransform(c => LazyAggregateExpandRule.apply(c.session))
+    injector.injectPostTransform(c => 
RewriteSortMergeJoinToHashJoinRule(c.session))
+    injector.injectPostTransform(c => 
PushdownAggregatePreProjectionAheadExpand(c.session))
+    injector.injectPostTransform(c => LazyAggregateExpandRule(c.session))
     injector.injectPostTransform(c => 
ConverRowNumbertWindowToAggregateRule(c.session))
     injector.injectPostTransform(
       c =>
         intercept(
           
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
             c.session)))
+    injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, CHBatch))
-    injector.injectPostTransform(c => RemoveDuplicatedColumns.apply(c.session))
-    injector.injectPostTransform(c => 
AddPreProjectionForHashJoin.apply(c.session))
-    injector.injectPostTransform(c => 
ReplaceSubStringComparison.apply(c.session))
+    injector.injectPostTransform(c => RemoveDuplicatedColumns(c.session))
+    injector.injectPostTransform(c => AddPreProjectionForHashJoin(c.session))
+    injector.injectPostTransform(c => ReplaceSubStringComparison(c.session))
     injector.injectPostTransform(c => 
EliminateDeduplicateAggregateWithAnyJoin(c.session))
-    injector.injectPostTransform(c => 
FlattenNestedExpressions.apply(c.session))
+    injector.injectPostTransform(c => FlattenNestedExpressions(c.session))
 
     // Gluten columnar: Fallback policies.
     injector.injectFallbackPolicy(c => p => 
ExpandFallbackPolicy(c.caller.isAqe(), p))
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 217c4d08d8..ea992d4beb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -946,7 +946,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       limit: Int,
       child: SparkPlan,
       offset: Int): ColumnarCollectLimitBaseExec =
-    throw new GlutenNotSupportException("ColumnarCollectLimit is not supported 
in ch backend.")
+    CHColumnarCollectLimitExec(limit, offset, child)
 
   override def genColumnarRangeExec(
       start: Long,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectLimitExec.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectLimitExec.scala
new file mode 100644
index 0000000000..2450babe14
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectLimitExec.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.CHNativeBlock
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+case class CHColumnarCollectLimitExec(limit: Int, offset: Int, child: 
SparkPlan)
+  extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  /**
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
+   * Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
+   * partially exceeds the remaining limit/offset.
+   */
+  override def collectWithOffsetAndLimit(
+      inputIter: Iterator[ColumnarBatch],
+      offset: Int,
+      limit: Int): Iterator[ColumnarBatch] = {
+
+    val unlimited = limit < 0
+    var rowsToSkip = math.max(offset, 0)
+    var rowsToCollect = if (unlimited) Int.MaxValue else limit
+
+    new Iterator[ColumnarBatch] {
+      private var nextBatch: Option[ColumnarBatch] = None
+
+      override def hasNext: Boolean = {
+        nextBatch.isDefined || fetchNextBatch()
+      }
+
+      override def next(): ColumnarBatch = {
+        if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
+        val batch = nextBatch.get
+        nextBatch = None
+        batch
+      }
+
+      /**
+       * Advance the iterator until we find a batch (possibly sliced) that we 
can return, or exhaust
+       * the input.
+       */
+      private def fetchNextBatch(): Boolean = {
+
+        if (rowsToCollect <= 0) return false
+
+        while (inputIter.hasNext) {
+          val batch = inputIter.next()
+          val batchSize = batch.numRows()
+
+          if (rowsToSkip >= batchSize) {
+            rowsToSkip -= batchSize
+          } else {
+            val startIndex = rowsToSkip
+            val leftoverAfterSkip = batchSize - startIndex
+            rowsToSkip = 0
+
+            val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+            val prunedBatch =
+              if (startIndex == 0 && needed == batchSize) {
+                batch
+              } else {
+                CHNativeBlock.slice(batch, startIndex, needed)
+              }
+
+            rowsToCollect -= needed
+            nextBatch = Some(prunedBatch)
+            return true
+          }
+        }
+        false
+      }
+    }
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
index 200a460c3d..ffabfe39a5 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
@@ -44,7 +44,9 @@ object PlanResolvedChecker {
   }
 }
 
-class RepalceFromJsonWithGetJsonObject(spark: SparkSession) extends 
Rule[LogicalPlan] with Logging {
+case class RepalceFromJsonWithGetJsonObject(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with Logging {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     if (
       !CHBackendSettings.enableReplaceFromJsonWithGetJsonObject || 
!PlanResolvedChecker.check(plan)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
index cfae1deeaa..05e522ae82 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
@@ -336,7 +336,7 @@ object CoalesceUnionUtil extends Logging {
  * really slow. The rewritten query only scan `t` once, and the performance is 
much better.
  */
 
-class CoalesceAggregationUnion(spark: SparkSession) extends Rule[LogicalPlan] 
with Logging {
+case class CoalesceAggregationUnion(spark: SparkSession) extends 
Rule[LogicalPlan] with Logging {
 
   case class PlanAnalyzer(originalAggregate: Aggregate) extends 
AbstractPlanAnalyzer {
 
@@ -825,7 +825,7 @@ class CoalesceAggregationUnion(spark: SparkSession) extends 
Rule[LogicalPlan] wi
  * select array(if(d=1, named_struct('f0', a, 'f1', b, 'f2', 1), null), 
if(d=2, named_struct('f0',
  * a, 'f1', b, 'f2', 2), null)) as s from t where d = 1 or d = 2 ) ) where s 
is not null
  */
-class CoalesceProjectionUnion(spark: SparkSession) extends Rule[LogicalPlan] 
with Logging {
+case class CoalesceProjectionUnion(spark: SparkSession) extends 
Rule[LogicalPlan] with Logging {
 
   case class PlanAnalyzer(originalPlan: LogicalPlan) extends 
AbstractPlanAnalyzer {
     def extractFilter(): Option[Filter] = {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
index 7e674367f1..cefee23dbf 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
@@ -32,7 +32,9 @@ import scala.collection.mutable
 // 2. append two options to spark config
 //    --conf spark.sql.planChangeLog.level=error
 //    --conf spark.sql.planChangeLog.batches=all
-class CommonSubexpressionEliminateRule(spark: SparkSession) extends 
Rule[LogicalPlan] with Logging {
+case class CommonSubexpressionEliminateRule(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with Logging {
 
   private var lastPlan: LogicalPlan = null
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
index 53b22f217f..6515ab99fb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
@@ -356,7 +356,7 @@ object ExtendedGeneratorNestedColumnAliasing {
 
 // ExtendedColumnPruning process Project(Filter(Generate)),
 // which is ignored by vanilla spark in optimization rule: ColumnPruning
-class ExtendedColumnPruning(spark: SparkSession) extends Rule[LogicalPlan] 
with Logging {
+case class ExtendedColumnPruning(spark: SparkSession) extends 
Rule[LogicalPlan] with Logging {
 
   override def apply(plan: LogicalPlan): LogicalPlan =
     plan.transformWithPruning(AlwaysProcess.fn) {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
index 6873c06348..68c9eb4dbd 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
@@ -36,7 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String
 // This rule try to make the filter condition into integer comparison, which 
is more efficient.
 // The above example will be rewritten into
 //    select * from table where to_unixtime('2023-11-02', 'yyyy-MM-dd') >= 
unix_timestamp
-class RewriteDateTimestampComparisonRule(spark: SparkSession)
+case class RewriteDateTimestampComparisonRule(spark: SparkSession)
   extends Rule[LogicalPlan]
   with Logging {
 
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
index a4eb7464de..4495cd8db2 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
@@ -38,7 +38,9 @@ import org.apache.spark.sql.types._
 // Under ch backend, the StringType can be directly converted into DateType,
 //     and the functions `from_unixtime` and `unix_timestamp` can be optimized 
here.
 // Optimized result is `to_date(stringType)`
-class RewriteToDateExpresstionRule(spark: SparkSession) extends 
Rule[LogicalPlan] with Logging {
+case class RewriteToDateExpresstionRule(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with Logging {
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     if (
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectLimitExecSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..729504b56b
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectLimitExecSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+
+class GlutenClickHouseCollectLimitExecSuite extends 
GlutenClickHouseWholeStageTransformerSuite {
+
+  protected val tablesPath: String = basePath + "/tpch-data"
+  protected val tpchQueries: String =
+    rootPath + 
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries"
+  protected val queriesResults: String = rootPath + "queries-output"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.io.compression.codec", "LZ4")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+  }
+
+  private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+      df: DataFrame,
+      checkMatch: Boolean): Unit = {
+    val executedPlan = getExecutedPlan(df)
+
+    val operatorFound = executedPlan.exists {
+      plan =>
+        try {
+          implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+        } catch {
+          case _: Throwable => false
+        }
+    }
+
+    val assertionCondition = operatorFound == checkMatch
+    val assertionMessage =
+      if (checkMatch) {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+          s"in executed plan:\n $executedPlan"
+      } else {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+          s"in executed plan:\n $executedPlan"
+      }
+
+    assert(assertionCondition, assertionMessage)
+  }
+
+  test("ColumnarCollectLimitExec - basic limit test") {
+    val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - with filter") {
+    val df = spark
+      .range(0, 20, 1)
+      .toDF("id")
+      .filter("id % 2 == 0")
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - range with repartition") {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .repartition(3)
+      .limit(3)
+    val expectedData = Seq(Row(0L), Row(3L), Row(6L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - with distinct values") {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .select("id")
+      .distinct()
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(4L), Row(3L), Row(2L), Row(5L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - chained limit") {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .limit(8)
+      .limit(3)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - limit after union") {
+    val df1 = spark.range(0, 5).toDF("id")
+    val df2 = spark.range(5, 10).toDF("id")
+    val unionDf = df1.union(df2).limit(3)
+
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(unionDf, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
+  }
+
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index b194e2183e..56e5a07239 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -558,8 +558,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def needPreComputeRangeFrameBoundary(): Boolean = true
 
-  override def supportCollectLimitExec(): Boolean = true
-
   override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
 
   override def supportIcebergEqualityDeleteRead(): Boolean = false
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
index 6fbbbfab66..61c42715ea 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
@@ -16,18 +16,10 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.columnarbatch.VeloxColumnarBatches
-import org.apache.gluten.extension.columnar.transition.Convention
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.execution.{ShuffledColumnarBatchRDD, SparkPlan}
-import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
-import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
@@ -36,34 +28,12 @@ case class ColumnarCollectLimitExec(
     offset: Int = 0
 ) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
 
-  override def batchType(): Convention.BatchType =
-    BackendsApiManager.getSettings.primaryBatchType
-
-  private lazy val writeMetrics =
-    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
-
-  private lazy val readMetrics =
-    
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
-
-  private lazy val useSortBasedShuffle: Boolean =
-    BackendsApiManager.getSparkPlanExecApiInstance
-      .useSortBasedShuffle(outputPartitioning, child.output)
-
-  @transient private lazy val serializer: Serializer =
-    BackendsApiManager.getSparkPlanExecApiInstance
-      .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
-
-  @transient override lazy val metrics: Map[String, SQLMetric] =
-    BackendsApiManager.getMetricsApiInstance
-      .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
-      readMetrics ++ writeMetrics
-
   /**
    * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
    * Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
    * partially exceeds the remaining limit/offset.
    */
-  private def collectWithOffsetAndLimit(
+  override def collectWithOffsetAndLimit(
       inputIter: Iterator[ColumnarBatch],
       offset: Int,
       limit: Int): Iterator[ColumnarBatch] = {
@@ -125,56 +95,6 @@ case class ColumnarCollectLimitExec(
     }
   }
 
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val childRDD = child.executeColumnar()
-
-    if (childRDD.getNumPartitions == 0) {
-      return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
-    }
-
-    val processedRDD =
-      if (childRDD.getNumPartitions == 1) childRDD
-      else shuffleLimitedPartitions(childRDD)
-
-    processedRDD.mapPartitions(
-      partition => {
-        if (limit > 0) {
-          val adjusted = math.max(0, limit - offset)
-          collectWithOffsetAndLimit(partition, offset, adjusted)
-        } else {
-          collectWithOffsetAndLimit(partition, offset, -1)
-        }
-      })
-  }
-
-  private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]): 
RDD[ColumnarBatch] = {
-    val applyLocalLimit = (offset == 0 && limit >= 0)
-    val locallyLimited = if (applyLocalLimit) {
-      childRDD.mapPartitions {
-        collectWithOffsetAndLimit(_, 0, limit)
-      }
-    } else {
-      childRDD
-    }
-    new ShuffledColumnarBatchRDD(
-      BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
-        locallyLimited,
-        child.output,
-        child.output,
-        SinglePartition,
-        serializer,
-        writeMetrics,
-        metrics,
-        useSortBasedShuffle
-      ),
-      readMetrics
-    )
-  }
-
-  override def rowType0(): Convention.RowType = Convention.RowType.None
-
-  override def output: Seq[Attribute] = child.output
-
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
     copy(child = newChild)
 }
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 8092c7b878..b9497aa920 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -543,11 +543,22 @@ 
Java_org_apache_gluten_vectorized_CHNativeBlock_copyBlock(JNIEnv * env, jobject
     DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
 
     auto copied_block = block->cloneWithColumns(block->getColumns());
-    auto a = new DB::Block(copied_block);
+    auto * a = new DB::Block(std::move(copied_block));
     return reinterpret_cast<jlong>(a);
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
+JNIEXPORT jlong
+Java_org_apache_gluten_vectorized_CHNativeBlock_nativeSlice(JNIEnv * env, 
jobject /* obj */, jlong block_address, jint offset, jint limit)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+    DB::Block cut_block = block->cloneWithCutColumns(offset, limit);
+
+    return reinterpret_cast<jlong>(new DB::Block(std::move(cut_block)));
+    LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
 JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader(
     JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed, 
jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
 {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index c9a3fdbc6d..de390b256c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,8 +152,6 @@ trait BackendSettingsApi {
 
   def needPreComputeRangeFrameBoundary(): Boolean = false
 
-  def supportCollectLimitExec(): Boolean = false
-
   def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
 
   def supportIcebergEqualityDeleteRead(): Boolean = true
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
index fc551e64d6..50ce2bc208 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
@@ -16,10 +16,18 @@
  */
 package org.apache.gluten.execution
 
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
-import org.apache.spark.sql.execution.{LimitExec, SparkPlan}
+import org.apache.spark.sql.execution.{LimitExec, ShuffledColumnarBatchRDD, 
SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 abstract class ColumnarCollectLimitBaseExec(
     limit: Int,
@@ -28,6 +36,83 @@ abstract class ColumnarCollectLimitBaseExec(
 ) extends LimitExec
   with ValidatablePlan {
 
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
+  override def batchType(): Convention.BatchType =
+    BackendsApiManager.getSettings.primaryBatchType
+
+  private lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+    
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+    BackendsApiManager.getMetricsApiInstance
+      .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+      readMetrics ++ writeMetrics
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val childRDD = child.executeColumnar()
+
+    if (childRDD.getNumPartitions == 0) {
+      return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
+    }
+
+    val processedRDD =
+      if (childRDD.getNumPartitions == 1) childRDD
+      else shuffleLimitedPartitions(childRDD)
+
+    processedRDD.mapPartitions(
+      partition => {
+        if (limit > 0) {
+          val adjusted = math.max(0, limit - offset)
+          collectWithOffsetAndLimit(partition, offset, adjusted)
+        } else {
+          collectWithOffsetAndLimit(partition, offset, -1)
+        }
+      })
+  }
+
+  private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]): 
RDD[ColumnarBatch] = {
+    val applyLocalLimit = (offset == 0 && limit >= 0)
+    val locallyLimited = if (applyLocalLimit) {
+      childRDD.mapPartitions {
+        collectWithOffsetAndLimit(_, 0, limit)
+      }
+    } else {
+      childRDD
+    }
+    new ShuffledColumnarBatchRDD(
+      BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
+        locallyLimited,
+        child.output,
+        child.output,
+        SinglePartition,
+        serializer,
+        writeMetrics,
+        metrics,
+        useSortBasedShuffle
+      ),
+      readMetrics
+    )
+  }
+
+  protected def collectWithOffsetAndLimit(
+      inputIter: Iterator[ColumnarBatch],
+      offset: Int,
+      limit: Int): Iterator[ColumnarBatch]
+
+  override def output: Seq[Attribute] = child.output
+
   override def outputPartitioning: Partitioning = SinglePartition
 
   override protected def doValidateInternal(): ValidationResult = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 57a60af3b8..36126cf4c7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,7 +136,6 @@ object Validators {
         fail(p)
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
-      case p: CollectLimitExec if !settings.supportCollectLimitExec() => 
fail(p)
       case _ => pass()
     }
   }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 0e4666c604..2d9444e7a6 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1158,6 +1158,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
       "- single join")
     .exclude("SPARK-35455: Unify empty relation optimization between normal 
and AQE optimizer " +
       "- multi join")
+    // Gluten columnar operator will have different number of shuffle
+    .exclude("SPARK-29906: AQE should not introduce extra shuffle for 
outermost limit")
     .excludeGlutenTest("Empty stage coalesced to 1-partition RDD")
     .excludeGlutenTest(
       "Avoid changing merge join to broadcast join if too many empty 
partitions on build plan")
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 0caa3008e1..ff01c5452d 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -49,7 +49,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
   enableSuite[ClickHouseAdaptiveQueryExecSuite]
     .includeAllGlutenTests()
     .includeByPrefix(
-      "SPARK-29906",
+      // exclude SPARK-29906 because gluten columnar operator will have 
different number of shuffle
       "SPARK-30291",
       "SPARK-30403",
       "SPARK-30719",
@@ -434,6 +434,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
   enableSuite[GlutenConfigBehaviorSuite]
     // Will be fixed by cleaning up ColumnarShuffleExchangeExec.
     .exclude("SPARK-22160 
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+    // Gluten columnar operator will have different number of jobs
+    .exclude("SPARK-40211: customize initialNumPartitions for take")
   enableSuite[GlutenCountMinSketchAggQuerySuite]
   enableSuite[GlutenCreateTableAsSelectSuite]
     .exclude("CREATE TABLE USING AS SELECT based on the file without write 
permission")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to