This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e5125c8ce [GLUTEN-8912][VL] Add Offset support for CollectLimitExec  
(#8914)
4e5125c8ce is described below

commit 4e5125c8ce017f3aaacdd30939564125d700cdc0
Author: Arnav Balyan <[email protected]>
AuthorDate: Fri Apr 18 23:46:47 2025 +0530

    [GLUTEN-8912][VL] Add Offset support for CollectLimitExec  (#8914)
    
    Thanks for your contribution!
---
 .../clickhouse/CHSparkPlanExecApi.scala            |   3 +-
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   2 +
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |   5 +-
 .../execution/ColumnarCollectLimitExec.scala       | 142 ++++++-----
 .../gluten/backendsapi/SparkPlanExecApi.scala      |   5 +-
 .../execution/ColumnarCollectLimitBaseExec.scala   |  22 +-
 .../columnar/CollectLimitTransformerRule.scala     |  41 ++++
 .../columnar/offload/OffloadSingleNodeRules.scala  |   4 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   4 +
 .../execution/GlutenSQLCollectLimitExecSuite.scala |  26 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   3 +
 .../execution/GlutenSQLCollectLimitExecSuite.scala |  30 ++-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   6 +-
 .../execution/GlutenSQLCollectLimitExecSuite.scala | 262 ++++++++++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   4 +-
 .../execution/GlutenSQLCollectLimitExecSuite.scala | 264 +++++++++++++++++++++
 .../org/apache/gluten/sql/shims/SparkShims.scala   |   4 +-
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |   6 +-
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |   6 +-
 19 files changed, 722 insertions(+), 117 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 0ec44a207a..217c4d08d8 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -944,7 +944,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
 
   override def genColumnarCollectLimitExec(
       limit: Int,
-      child: SparkPlan): ColumnarCollectLimitBaseExec =
+      child: SparkPlan,
+      offset: Int): ColumnarCollectLimitBaseExec =
     throw new GlutenNotSupportException("ColumnarCollectLimit is not supported 
in ch backend.")
 
   override def genColumnarRangeExec(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index f14d08ed86..a4e1eb24b8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -100,6 +100,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
+    injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
 
     // Gluten columnar: Fallback policies.
@@ -187,6 +188,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
+    injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
     injector.injectPostTransform(c => RemoveTopmostColumnarToRow(c.session, 
c.caller.isAqe()))
     SparkShimLoader.getSparkShims
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 0dc83e98d4..44aa8e836f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -920,8 +920,9 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
 
   override def genColumnarCollectLimitExec(
       limit: Int,
-      child: SparkPlan): ColumnarCollectLimitBaseExec =
-    ColumnarCollectLimitExec(limit, child)
+      child: SparkPlan,
+      offset: Int): ColumnarCollectLimitBaseExec =
+    ColumnarCollectLimitExec(limit, child, offset)
 
   override def genColumnarRangeExec(
       start: Long,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
index 8c328430ed..6fbbbfab66 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
@@ -32,88 +32,99 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
     limit: Int,
-    child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+    child: SparkPlan,
+    offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
 
   override def batchType(): Convention.BatchType =
     BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+    
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+    BackendsApiManager.getMetricsApiInstance
+      .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+      readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
    * Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
    */
-  private def collectLimitedRows(
-      partitionIter: Iterator[ColumnarBatch],
-      limit: Int
-  ): Iterator[ColumnarBatch] = {
-    if (partitionIter.isEmpty) {
-      return Iterator.empty
-    }
-    new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+      inputIter: Iterator[ColumnarBatch],
+      offset: Int,
+      limit: Int): Iterator[ColumnarBatch] = {
+
+    val unlimited = limit < 0
+    var rowsToSkip = math.max(offset, 0)
+    var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-      private var rowsCollected = 0
+    new Iterator[ColumnarBatch] {
       private var nextBatch: Option[ColumnarBatch] = None
 
       override def hasNext: Boolean = {
-        nextBatch.isDefined || fetchNext()
+        nextBatch.isDefined || fetchNextBatch()
       }
 
       override def next(): ColumnarBatch = {
-        if (!hasNext) {
-          throw new NoSuchElementException("No more batches available.")
-        }
+        if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
         val batch = nextBatch.get
         nextBatch = None
         batch
       }
 
       /**
-       * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-       * limit. Returns true if we found a new batch, false otherwise.
+       * Advance the iterator until we find a batch (possibly sliced) that we 
can return, or exhaust
+       * the input.
        */
-      private def fetchNext(): Boolean = {
-        if (rowsCollected >= limit || !partitionIter.hasNext) {
-          return false
+      private def fetchNextBatch(): Boolean = {
+
+        if (rowsToCollect <= 0) return false
+
+        while (inputIter.hasNext) {
+          val batch = inputIter.next()
+          val batchSize = batch.numRows()
+
+          if (rowsToSkip >= batchSize) {
+            rowsToSkip -= batchSize
+          } else {
+            val startIndex = rowsToSkip
+            val leftoverAfterSkip = batchSize - startIndex
+            rowsToSkip = 0
+
+            val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+            val prunedBatch =
+              if (startIndex == 0 && needed == batchSize) {
+                ColumnarBatches.retain(batch)
+                batch
+              } else {
+                VeloxColumnarBatches.slice(batch, startIndex, needed)
+              }
+
+            rowsToCollect -= needed
+            nextBatch = Some(prunedBatch)
+            return true
+          }
         }
-
-        val currentBatch = partitionIter.next()
-        val currentBatchRowCount = currentBatch.numRows()
-        val remaining = limit - rowsCollected
-
-        if (currentBatchRowCount <= remaining) {
-          rowsCollected += currentBatchRowCount
-          ColumnarBatches.retain(currentBatch)
-          nextBatch = Some(currentBatch)
-        } else {
-          val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-          rowsCollected += remaining
-          nextBatch = Some(prunedBatch)
-        }
-        true
+        false
       }
     }
   }
 
-  private lazy val writeMetrics =
-    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
-
-  private lazy val readMetrics =
-    
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
-
-  private lazy val useSortBasedShuffle: Boolean =
-    BackendsApiManager.getSparkPlanExecApiInstance
-      .useSortBasedShuffle(outputPartitioning, child.output)
-
-  @transient private lazy val serializer: Serializer =
-    BackendsApiManager.getSparkPlanExecApiInstance
-      .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
-
-  @transient override lazy val metrics: Map[String, SQLMetric] =
-    BackendsApiManager.getMetricsApiInstance
-      .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
-      readMetrics ++ writeMetrics
-
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val childRDD = child.executeColumnar()
 
@@ -125,11 +136,26 @@ case class ColumnarCollectLimitExec(
       if (childRDD.getNumPartitions == 1) childRDD
       else shuffleLimitedPartitions(childRDD)
 
-    processedRDD.mapPartitions(partition => collectLimitedRows(partition, 
limit))
+    processedRDD.mapPartitions(
+      partition => {
+        if (limit > 0) {
+          val adjusted = math.max(0, limit - offset)
+          collectWithOffsetAndLimit(partition, offset, adjusted)
+        } else {
+          collectWithOffsetAndLimit(partition, offset, -1)
+        }
+      })
   }
 
   private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]): 
RDD[ColumnarBatch] = {
-    val locallyLimited = childRDD.mapPartitions(partition => 
collectLimitedRows(partition, limit))
+    val applyLocalLimit = (offset == 0 && limit >= 0)
+    val locallyLimited = if (applyLocalLimit) {
+      childRDD.mapPartitions {
+        collectWithOffsetAndLimit(_, 0, limit)
+      }
+    } else {
+      childRDD
+    }
     new ShuffledColumnarBatchRDD(
       BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
         locallyLimited,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index a798053f6f..a899099032 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -710,7 +710,10 @@ trait SparkPlanExecApi {
       original: StringSplit): ExpressionTransformer =
     GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, 
limitExpr), original)
 
-  def genColumnarCollectLimitExec(limit: Int, plan: SparkPlan): 
ColumnarCollectLimitBaseExec
+  def genColumnarCollectLimitExec(
+      limit: Int,
+      plan: SparkPlan,
+      offset: Int): ColumnarCollectLimitBaseExec
 
   def genColumnarRangeExec(
       start: Long,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
index c68d0cd51c..fc551e64d6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
@@ -16,27 +16,22 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.ValidationResult
-import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
-import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan}
+import org.apache.spark.sql.execution.{LimitExec, SparkPlan}
 
 abstract class ColumnarCollectLimitBaseExec(
     limit: Int,
-    childPlan: SparkPlan
+    childPlan: SparkPlan,
+    offset: Int = 0
 ) extends LimitExec
   with ValidatablePlan {
 
   override def outputPartitioning: Partitioning = SinglePartition
 
   override protected def doValidateInternal(): ValidationResult = {
-    if (!SparkShimLoader.getSparkShims.isColumnarLimitExecSupported()) {
-      return ValidationResult.failed(
-        "Columnar collect-limit is unsupported under the current Spark 
version")
-    }
-    ValidationResult.succeeded
+    ValidationResult.failed("Columnar shuffle not enabled or child does not 
support columnar.")
   }
 
   override protected def doExecute()
@@ -45,12 +40,3 @@ abstract class ColumnarCollectLimitBaseExec(
   }
 
 }
-object ColumnarCollectLimitBaseExec {
-  def from(collectLimitExec: CollectLimitExec): ColumnarCollectLimitBaseExec = 
{
-    BackendsApiManager.getSparkPlanExecApiInstance
-      .genColumnarCollectLimitExec(
-        collectLimitExec.limit,
-        collectLimitExec.child
-      )
-  }
-}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectLimitTransformerRule.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectLimitTransformerRule.scala
new file mode 100644
index 0000000000..bb4761a07e
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectLimitTransformerRule.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlan}
+
+case class CollectLimitTransformerRule() extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!GlutenConfig.get.enableColumnarCollectLimit) {
+      return plan
+    }
+
+    val transformed = plan.transformUp {
+      case exec: CollectLimitExec if exec.child.supportsColumnar =>
+        val offset = SparkShimLoader.getSparkShims.getCollectLimitOffset(exec)
+        BackendsApiManager.getSparkPlanExecApiInstance
+          .genColumnarCollectLimitExec(exec.limit, exec.child, offset)
+    }
+
+    transformed
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 750fc060cd..2ad6701742 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -345,9 +345,11 @@ object OffloadOthers {
             child)
         case plan: CollectLimitExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          val offset = 
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
           
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
             plan.limit,
-            plan.child
+            plan.child,
+            offset
           )
         case plan: RDDScanExec if 
RDDScanTransformer.isSupportRDDScanExec(plan) =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 66e2d3159e..60d81f106f 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.utils.velox
 
+import org.apache.gluten.execution.GlutenSQLCollectLimitExecSuite
 import org.apache.gluten.utils.{BackendTestSettings, SQLQueryTestSettings}
 
 import org.apache.spark.GlutenSortShuffleSuite
@@ -780,6 +781,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenConfigBehaviorSuite]
     // Will be fixed by cleaning up ColumnarShuffleExchangeExec.
     .exclude("SPARK-22160 
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+    // Gluten columnar operator will have different number of jobs
+    .exclude("SPARK-40211: customize initialNumPartitions for take")
   enableSuite[GlutenCountMinSketchAggQuerySuite]
   enableSuite[GlutenCsvFunctionsSuite]
   enableSuite[GlutenCTEHintSuite]
@@ -812,6 +815,7 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenHiveSQLQuerySuite]
   enableSuite[GlutenCollapseProjectExecTransformerSuite]
   enableSuite[GlutenSparkSessionExtensionSuite]
+  enableSuite[GlutenSQLCollectLimitExecSuite]
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
similarity index 77%
copy from 
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
copy to 
gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
index 7ac89c33de..d54d1573a5 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -17,14 +17,11 @@
 package org.apache.gluten.execution
 
 import org.apache.spark.SparkConf
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
 
-class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite {
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
 
-  override protected val resourcePath: String = "N/A"
-  override protected val fileFormat: String = "N/A"
-
-  override protected def sparkConf: SparkConf = {
+  override def sparkConf: SparkConf = {
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
   }
@@ -56,7 +53,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assert(assertionCondition, assertionMessage)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - basic limit test", 
"3.2", "3.3") {
+  test("ColumnarCollectLimitExec - basic limit test") {
     val df = spark.range(0, 1000, 1).toDF("id").limit(5)
     val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
 
@@ -65,7 +62,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", 
"3.2", "3.3") {
+  test("ColumnarCollectLimitExec - with filter") {
     val df = spark
       .range(0, 20, 1)
       .toDF("id")
@@ -78,21 +75,20 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - range with 
repartition", "3.2", "3.3") {
+  test("ColumnarCollectLimitExec - range with repartition") {
 
     val df = spark
       .range(0, 10, 1)
       .toDF("id")
       .repartition(3)
+      .orderBy("id")
       .limit(3)
-    val expectedData = Seq(Row(1L), Row(2L), Row(4L))
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
 
     checkAnswer(df, expectedData)
-
-    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with distinct 
values", "3.2", "3.3") {
+  test("ColumnarCollectLimitExec - with distinct values") {
     val df = spark
       .range(0, 10, 1)
       .toDF("id")
@@ -106,7 +102,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit", 
"3.2", "3.3") {
+  test("ColumnarCollectLimitExec - chained limit") {
     val df = spark
       .range(0, 10, 1)
       .toDF("id")
@@ -119,7 +115,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - limit after 
union", "3.2", "3.3") {
+  test("ColumnarCollectLimitExec - limit after union") {
     val df1 = spark.range(0, 5).toDF("id")
     val df2 = spark.range(5, 10).toDF("id")
     val unionDf = df1.union(df2).limit(3)
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 8f0d562f95..676b562889 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -632,6 +632,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenConfigBehaviorSuite]
     // Will be fixed by cleaning up ColumnarShuffleExchangeExec.
     .exclude("SPARK-22160 
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+    // Gluten columnar operator will have different number of jobs
+    .exclude("SPARK-40211: customize initialNumPartitions for take")
   enableSuite[GlutenCountMinSketchAggQuerySuite]
   enableSuite[GlutenCsvFunctionsSuite]
   enableSuite[GlutenCTEHintSuite]
@@ -862,6 +864,7 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenImplicitsTest]
   enableSuite[GlutenCollapseProjectExecTransformerSuite]
   enableSuite[GlutenSparkSessionExtensionSuite]
+  enableSuite[GlutenSQLCollectLimitExecSuite]
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
similarity index 76%
rename from 
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
rename to 
gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
index 7ac89c33de..069dea32f4 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution
+package org.apache.spark.sql.execution
 
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
 
-class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite {
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
 
-  override protected val resourcePath: String = "N/A"
-  override protected val fileFormat: String = "N/A"
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
 
-  override protected def sparkConf: SparkConf = {
+  override def sparkConf: SparkConf = {
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
   }
@@ -56,7 +55,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assert(assertionCondition, assertionMessage)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - basic limit test", 
"3.2", "3.3") {
+  test("ColumnarCollectLimitExec - basic limit test") {
     val df = spark.range(0, 1000, 1).toDF("id").limit(5)
     val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
 
@@ -65,7 +64,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", 
"3.2", "3.3") {
+  test("ColumnarCollectLimitExec - with filter") {
     val df = spark
       .range(0, 20, 1)
       .toDF("id")
@@ -78,21 +77,20 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - range with 
repartition", "3.2", "3.3") {
+  test("ColumnarCollectLimitExec - range with repartition") {
 
     val df = spark
       .range(0, 10, 1)
       .toDF("id")
       .repartition(3)
+      .orderBy("id")
       .limit(3)
-    val expectedData = Seq(Row(1L), Row(2L), Row(4L))
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
 
     checkAnswer(df, expectedData)
-
-    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with distinct 
values", "3.2", "3.3") {
+  test("ColumnarCollectLimitExec - with distinct values") {
     val df = spark
       .range(0, 10, 1)
       .toDF("id")
@@ -106,7 +104,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit", 
"3.2", "3.3") {
+  test("ColumnarCollectLimitExec - chained limit") {
     val df = spark
       .range(0, 10, 1)
       .toDF("id")
@@ -119,7 +117,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
     assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - limit after 
union", "3.2", "3.3") {
+  test("ColumnarCollectLimitExec - limit after union") {
     val df1 = spark.range(0, 5).toDF("id")
     val df2 = spark.range(5, 10).toDF("id")
     val unionDf = df1.union(df2).limit(3)
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index d688cc1272..3f6ed85f87 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.utils.velox
 
+import org.apache.gluten.execution.GlutenSQLCollectLimitExecSuite
 import org.apache.gluten.utils.{BackendTestSettings, SQLQueryTestSettings}
 
 import org.apache.spark.GlutenSortShuffleSuite
@@ -185,7 +186,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[VeloxAdaptiveQueryExecSuite]
     .includeAllGlutenTests()
     .includeByPrefix(
-      "SPARK-29906",
       "SPARK-30291",
       "SPARK-30403",
       "SPARK-30719",
@@ -667,6 +667,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenConfigBehaviorSuite]
     // Will be fixed by cleaning up ColumnarShuffleExchangeExec.
     .exclude("SPARK-22160 
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+    // Gluten columnar operator will have different number of jobs
+    .exclude("SPARK-40211: customize initialNumPartitions for take")
   enableSuite[GlutenCountMinSketchAggQuerySuite]
   enableSuite[GlutenCsvFunctionsSuite]
   enableSuite[GlutenCTEHintSuite]
@@ -899,6 +901,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenParquetRowIndexSuite]
     .excludeByPrefix("row index generation")
     .excludeByPrefix("invalid row index column type")
+  enableSuite[GlutenSQLCollectLimitExecSuite]
+
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
 // scalastyle:on line.size.limit
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..11f773a2c6
--- /dev/null
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
+
+  override def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+  }
+
+  private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+      df: DataFrame,
+      checkMatch: Boolean): Unit = {
+    val executedPlan = getExecutedPlan(df)
+
+    val operatorFound = executedPlan.exists {
+      plan =>
+        try {
+          implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+        } catch {
+          case _: Throwable => false
+        }
+    }
+
+    val assertionCondition = operatorFound == checkMatch
+    val assertionMessage =
+      if (checkMatch) {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+          s"in executed plan:\n $executedPlan"
+      } else {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+          s"in executed plan:\n $executedPlan"
+      }
+
+    assert(assertionCondition, assertionMessage)
+  }
+
+  test("ColumnarCollectLimitExec - basic limit test") {
+    val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - with filter") {
+    val df = spark
+      .range(0, 20, 1)
+      .toDF("id")
+      .filter("id % 2 == 0")
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - range with repartition") {
+
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .repartition(3)
+      .orderBy("id")
+      .limit(3)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(df, expectedData)
+  }
+
+  test("ColumnarCollectLimitExec - with distinct values") {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .select("id")
+      .distinct()
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - chained limit") {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .limit(8)
+      .limit(3)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - limit after union") {
+    val df1 = spark.range(0, 5).toDF("id")
+    val df2 = spark.range(5, 10).toDF("id")
+    val unionDf = df1.union(df2).limit(3)
+
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(unionDf, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
+  }
+
+  test("ColumnarCollectLimitExec - offset test") {
+    val df1 = spark.range(0, 10, 1).toDF("id").limit(5).offset(2)
+    val expectedData1 = Seq(Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df1, expectedData1)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch = 
true)
+
+    val df2 = spark.range(0, 20, 1).toDF("id").limit(12).offset(5)
+    val expectedData2 = Seq(Row(5L), Row(6L), Row(7L), Row(8L), Row(9L), 
Row(10L), Row(11L))
+
+    checkAnswer(df2, expectedData2)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch = 
true)
+
+    val df3 = spark.range(0, 30, 1).toDF("id").limit(10).offset(3)
+    val expectedData3 = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L), 
Row(8L), Row(9L))
+
+    checkAnswer(df3, expectedData3)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch = 
true)
+
+    val df4 = spark.range(0, 15, 1).toDF("id").limit(8).offset(4)
+    val expectedData4 = Seq(Row(4L), Row(5L), Row(6L), Row(7L))
+
+    checkAnswer(df4, expectedData4)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch = 
true)
+
+    val df5 = spark.range(0, 50, 1).toDF("id").limit(20).offset(10)
+    val expectedData5 = Seq(
+      Row(10L),
+      Row(11L),
+      Row(12L),
+      Row(13L),
+      Row(14L),
+      Row(15L),
+      Row(16L),
+      Row(17L),
+      Row(18L),
+      Row(19L))
+
+    checkAnswer(df5, expectedData5)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - pure offset test") {
+    val df1 = spark.range(0, 20, 1).toDF("id").offset(5)
+    val expectedData1 = Seq(
+      Row(5L),
+      Row(6L),
+      Row(7L),
+      Row(8L),
+      Row(9L),
+      Row(10L),
+      Row(11L),
+      Row(12L),
+      Row(13L),
+      Row(14L),
+      Row(15L),
+      Row(16L),
+      Row(17L),
+      Row(18L),
+      Row(19L))
+
+    checkAnswer(df1, expectedData1)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch = 
true)
+
+    val df2 = spark.range(0, 50, 1).toDF("id").offset(10)
+    val expectedData2 = (10L to 49L).map(Row(_))
+
+    checkAnswer(df2, expectedData2)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch = 
true)
+
+    val df3 = spark.range(0, 100, 2).toDF("id").offset(15)
+    val expectedData3 = (30L to 98L by 2).map(Row(_))
+
+    checkAnswer(df3, expectedData3)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch = 
true)
+
+    val df4 = spark.range(0, 30, 1).toDF("id").offset(20)
+    val expectedData4 = (20L to 29L).map(Row(_))
+
+    checkAnswer(df4, expectedData4)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch = 
true)
+
+    val df5 = spark.range(0, 200, 5).toDF("id").offset(10)
+    val expectedData5 = (50L to 195L by 5).map(Row(_))
+
+    checkAnswer(df5, expectedData5)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch = 
true)
+
+    val df6 = spark.range(0, 5, 1).toDF("id").limit(10)
+    val expectedData6 = (0L to 4L).map(Row(_))
+
+    checkAnswer(df6, expectedData6)
+  }
+
+  test("ColumnarCollectLimitExec - offset with filter") {
+    val df = spark.range(0, 10, 1).toDF("id").filter("id % 2 == 
0").limit(5).offset(2)
+    val expectedData = Seq(Row(4L), Row(6L), Row(8L))
+
+    checkAnswer(df, expectedData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - offset after union") {
+    val df1 = spark.range(0, 5).toDF("id")
+    val df2 = spark.range(5, 10).toDF("id")
+    val unionDf = df1.union(df2).limit(6).offset(3)
+
+    val expectedData = Seq(Row(3L), Row(4L), Row(5L))
+    checkAnswer(unionDf, expectedData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
+  }
+
+  test("ColumnarCollectLimitExec - single partition with limit, offset, and 
limit + offset") {
+
+    val singlePartitionDF = spark.range(0, 10, 1).toDF("id").coalesce(1)
+
+    val limitDF = singlePartitionDF.limit(5)
+    val expectedLimitData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+    checkAnswer(limitDF, expectedLimitData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitDF, 
checkMatch = true)
+
+    val offsetDF = singlePartitionDF.offset(3)
+    val expectedOffsetData = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L), 
Row(8L), Row(9L))
+    checkAnswer(offsetDF, expectedOffsetData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](offsetDF, 
checkMatch = true)
+
+    val limitOffsetDF = singlePartitionDF.limit(5).offset(2)
+    val expectedLimitOffsetData = Seq(Row(2L), Row(3L), Row(4L))
+    checkAnswer(limitOffsetDF, expectedLimitOffsetData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitOffsetDF, 
checkMatch = true)
+  }
+
+}
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a069724fa9..c84ae5803e 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -188,7 +188,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[VeloxAdaptiveQueryExecSuite]
     .includeAllGlutenTests()
     .includeByPrefix(
-      "SPARK-29906",
       "SPARK-30291",
       "SPARK-30403",
       "SPARK-30719",
@@ -687,6 +686,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenConfigBehaviorSuite]
     // Will be fixed by cleaning up ColumnarShuffleExchangeExec.
     .exclude("SPARK-22160 
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+    // Gluten columnar operator will have different number of jobs
+    .exclude("SPARK-40211: customize initialNumPartitions for take")
   enableSuite[GlutenCountMinSketchAggQuerySuite]
   enableSuite[GlutenCsvFunctionsSuite]
   enableSuite[GlutenCTEHintSuite]
@@ -936,6 +937,7 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
   enableSuite[GlutenTableLocationSuite]
   enableSuite[GlutenRemoveRedundantWindowGroupLimitsSuite]
+  enableSuite[GlutenSQLCollectLimitExecSuite]
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..a946843d3f
--- /dev/null
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
+
+  override def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+  }
+
+  private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+      df: DataFrame,
+      checkMatch: Boolean): Unit = {
+    val executedPlan = getExecutedPlan(df)
+
+    val operatorFound = executedPlan.exists {
+      plan =>
+        try {
+          implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+        } catch {
+          case _: Throwable => false
+        }
+    }
+
+    val assertionCondition = operatorFound == checkMatch
+    val assertionMessage =
+      if (checkMatch) {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+          s"in executed plan:\n $executedPlan"
+      } else {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+          s"in executed plan:\n $executedPlan"
+      }
+
+    assert(assertionCondition, assertionMessage)
+  }
+
+  test("ColumnarCollectLimitExec - basic limit test") {
+    val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - with filter") {
+    val df = spark
+      .range(0, 20, 1)
+      .toDF("id")
+      .filter("id % 2 == 0")
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - range with repartition") {
+
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .repartition(3)
+      .orderBy("id")
+      .limit(3)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(df, expectedData)
+  }
+
+  test("ColumnarCollectLimitExec - with distinct values") {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .select("id")
+      .distinct()
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - chained limit") {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .limit(8)
+      .limit(3)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - limit after union") {
+    val df1 = spark.range(0, 5).toDF("id")
+    val df2 = spark.range(5, 10).toDF("id")
+    val unionDf = df1.union(df2).limit(3)
+
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(unionDf, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
+  }
+
+  test("ColumnarCollectLimitExec - offset test") {
+    val df1 = spark.range(0, 10, 1).toDF("id").limit(5).offset(2)
+    val expectedData1 = Seq(Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df1, expectedData1)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch = 
true)
+
+    val df2 = spark.range(0, 20, 1).toDF("id").limit(12).offset(5)
+    val expectedData2 = Seq(Row(5L), Row(6L), Row(7L), Row(8L), Row(9L), 
Row(10L), Row(11L))
+
+    checkAnswer(df2, expectedData2)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch = 
true)
+
+    val df3 = spark.range(0, 30, 1).toDF("id").limit(10).offset(3)
+    val expectedData3 = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L), 
Row(8L), Row(9L))
+
+    checkAnswer(df3, expectedData3)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch = 
true)
+
+    val df4 = spark.range(0, 15, 1).toDF("id").limit(8).offset(4)
+    val expectedData4 = Seq(Row(4L), Row(5L), Row(6L), Row(7L))
+
+    checkAnswer(df4, expectedData4)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch = 
true)
+
+    val df5 = spark.range(0, 50, 1).toDF("id").limit(20).offset(10)
+    val expectedData5 = Seq(
+      Row(10L),
+      Row(11L),
+      Row(12L),
+      Row(13L),
+      Row(14L),
+      Row(15L),
+      Row(16L),
+      Row(17L),
+      Row(18L),
+      Row(19L))
+
+    checkAnswer(df5, expectedData5)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - pure offset test") {
+    val df1 = spark.range(0, 20, 1).toDF("id").offset(5)
+    val expectedData1 = Seq(
+      Row(5L),
+      Row(6L),
+      Row(7L),
+      Row(8L),
+      Row(9L),
+      Row(10L),
+      Row(11L),
+      Row(12L),
+      Row(13L),
+      Row(14L),
+      Row(15L),
+      Row(16L),
+      Row(17L),
+      Row(18L),
+      Row(19L))
+
+    checkAnswer(df1, expectedData1)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch = 
true)
+
+    val df2 = spark.range(0, 50, 1).toDF("id").offset(10)
+    val expectedData2 = (10L to 49L).map(Row(_))
+
+    checkAnswer(df2, expectedData2)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch = 
true)
+
+    val df3 = spark.range(0, 100, 2).toDF("id").offset(15)
+    val expectedData3 = (30L to 98L by 2).map(Row(_))
+
+    checkAnswer(df3, expectedData3)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch = 
true)
+
+    val df4 = spark.range(0, 30, 1).toDF("id").offset(20)
+    val expectedData4 = (20L to 29L).map(Row(_))
+
+    checkAnswer(df4, expectedData4)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch = 
true)
+
+    val df5 = spark.range(0, 200, 5).toDF("id").offset(10)
+    val expectedData5 = (50L to 195L by 5).map(Row(_))
+
+    checkAnswer(df5, expectedData5)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch = 
true)
+
+    val df6 = spark.range(0, 5, 1).toDF("id").limit(10)
+    val expectedData6 = (0L to 4L).map(Row(_))
+
+    checkAnswer(df6, expectedData6)
+  }
+
+  test("ColumnarCollectLimitExec - offset with filter") {
+    val df = spark.range(0, 10, 1).toDF("id").filter("id % 2 == 
0").limit(5).offset(2)
+    val expectedData = Seq(Row(4L), Row(6L), Row(8L))
+
+    checkAnswer(df, expectedData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  test("ColumnarCollectLimitExec - offset after union") {
+    val df1 = spark.range(0, 5).toDF("id")
+    val df2 = spark.range(5, 10).toDF("id")
+    val unionDf = df1.union(df2).limit(6).offset(3)
+
+    val expectedData = Seq(Row(3L), Row(4L), Row(5L))
+    checkAnswer(unionDf, expectedData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
+  }
+
+  test("ColumnarCollectLimitExec - single partition with limit, offset, and 
limit + offset") {
+
+    val singlePartitionDF = spark.range(0, 10, 1).toDF("id").coalesce(1)
+
+    val limitDF = singlePartitionDF.limit(5)
+    val expectedLimitData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+    checkAnswer(limitDF, expectedLimitData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitDF, 
checkMatch = true)
+
+    val offsetDF = singlePartitionDF.offset(3)
+    val expectedOffsetData = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L), 
Row(8L), Row(9L))
+    checkAnswer(offsetDF, expectedOffsetData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](offsetDF, 
checkMatch = true)
+
+    val limitOffsetDF = singlePartitionDF.limit(5).offset(2)
+    val expectedLimitOffsetData = Seq(Row(2L), Row(3L), Row(4L))
+    checkAnswer(limitOffsetDF, expectedLimitOffsetData)
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitOffsetDF, 
checkMatch = true)
+  }
+
+}
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 0149e2ea55..537385fa3d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
-import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec, 
SparkPlan, TakeOrderedAndProjectExec}
+import org.apache.spark.sql.execution.{CollectLimitExec, FileSourceScanExec, 
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -316,4 +316,6 @@ trait SparkShims {
 
   def getOtherConstantMetadataColumnValues(file: PartitionedFile): 
JMap[String, Object] =
     Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
+
+  def getCollectLimitOffset(plan: CollectLimitExec): Int = 0
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 8ff5e17873..f43d212274 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -628,6 +628,10 @@ class Spark34Shims extends SparkShims {
     }
   }
 
-  override def isColumnarLimitExecSupported(): Boolean = false
+  override def isColumnarLimitExecSupported(): Boolean = true
+
+  override def getCollectLimitOffset(plan: CollectLimitExec): Int = {
+    plan.offset
+  }
 
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 43a3239a2a..88df86a35e 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -684,8 +684,12 @@ class Spark35Shims extends SparkShims {
     }
   }
 
-  override def isColumnarLimitExecSupported(): Boolean = false
+  override def isColumnarLimitExecSupported(): Boolean = true
 
   override def getOtherConstantMetadataColumnValues(file: PartitionedFile): 
JMap[String, Object] =
     file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, 
Object]]
+
+  override def getCollectLimitOffset(plan: CollectLimitExec): Int = {
+    plan.offset
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to