This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4e5125c8ce [GLUTEN-8912][VL] Add Offset support for CollectLimitExec
(#8914)
4e5125c8ce is described below
commit 4e5125c8ce017f3aaacdd30939564125d700cdc0
Author: Arnav Balyan <[email protected]>
AuthorDate: Fri Apr 18 23:46:47 2025 +0530
[GLUTEN-8912][VL] Add Offset support for CollectLimitExec (#8914)
Thanks for your contribution!
---
.../clickhouse/CHSparkPlanExecApi.scala | 3 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 5 +-
.../execution/ColumnarCollectLimitExec.scala | 142 ++++++-----
.../gluten/backendsapi/SparkPlanExecApi.scala | 5 +-
.../execution/ColumnarCollectLimitBaseExec.scala | 22 +-
.../columnar/CollectLimitTransformerRule.scala | 41 ++++
.../columnar/offload/OffloadSingleNodeRules.scala | 4 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 4 +
.../execution/GlutenSQLCollectLimitExecSuite.scala | 26 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 3 +
.../execution/GlutenSQLCollectLimitExecSuite.scala | 30 ++-
.../gluten/utils/velox/VeloxTestSettings.scala | 6 +-
.../execution/GlutenSQLCollectLimitExecSuite.scala | 262 ++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 4 +-
.../execution/GlutenSQLCollectLimitExecSuite.scala | 264 +++++++++++++++++++++
.../org/apache/gluten/sql/shims/SparkShims.scala | 4 +-
.../gluten/sql/shims/spark34/Spark34Shims.scala | 6 +-
.../gluten/sql/shims/spark35/Spark35Shims.scala | 6 +-
19 files changed, 722 insertions(+), 117 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 0ec44a207a..217c4d08d8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -944,7 +944,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
override def genColumnarCollectLimitExec(
limit: Int,
- child: SparkPlan): ColumnarCollectLimitBaseExec =
+ child: SparkPlan,
+ offset: Int): ColumnarCollectLimitBaseExec =
throw new GlutenNotSupportException("ColumnarCollectLimit is not supported
in ch backend.")
override def genColumnarRangeExec(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index f14d08ed86..a4e1eb24b8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -100,6 +100,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
+ injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
// Gluten columnar: Fallback policies.
@@ -187,6 +188,7 @@ object VeloxRuleApi {
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
+ injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
injector.injectPostTransform(c => RemoveTopmostColumnarToRow(c.session,
c.caller.isAqe()))
SparkShimLoader.getSparkShims
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 0dc83e98d4..44aa8e836f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -920,8 +920,9 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
override def genColumnarCollectLimitExec(
limit: Int,
- child: SparkPlan): ColumnarCollectLimitBaseExec =
- ColumnarCollectLimitExec(limit, child)
+ child: SparkPlan,
+ offset: Int): ColumnarCollectLimitBaseExec =
+ ColumnarCollectLimitExec(limit, child, offset)
override def genColumnarRangeExec(
start: Long,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
index 8c328430ed..6fbbbfab66 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
@@ -32,88 +32,99 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
- child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+ child: SparkPlan,
+ offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+ SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+ BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
- if (partitionIter.isEmpty) {
- return Iterator.empty
- }
- new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+ val unlimited = limit < 0
+ var rowsToSkip = math.max(offset, 0)
+ var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+ new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
- nextBatch.isDefined || fetchNext()
+ nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
- if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
- }
+ if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced) that we
can return, or exhaust
+ * the input.
*/
- private def fetchNext(): Boolean = {
- if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
+ private def fetchNextBatch(): Boolean = {
+
+ if (rowsToCollect <= 0) return false
+
+ while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+ rowsToSkip -= batchSize
+ } else {
+ val startIndex = rowsToSkip
+ val leftoverAfterSkip = batchSize - startIndex
+ rowsToSkip = 0
+
+ val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+ val prunedBatch =
+ if (startIndex == 0 && needed == batchSize) {
+ ColumnarBatches.retain(batch)
+ batch
+ } else {
+ VeloxColumnarBatches.slice(batch, startIndex, needed)
+ }
+
+ rowsToCollect -= needed
+ nextBatch = Some(prunedBatch)
+ return true
+ }
}
-
- val currentBatch = partitionIter.next()
- val currentBatchRowCount = currentBatch.numRows()
- val remaining = limit - rowsCollected
-
- if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
- } else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
- }
- true
+ false
}
}
}
- private lazy val writeMetrics =
- SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
-
- private lazy val readMetrics =
-
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
-
- private lazy val useSortBasedShuffle: Boolean =
- BackendsApiManager.getSparkPlanExecApiInstance
- .useSortBasedShuffle(outputPartitioning, child.output)
-
- @transient private lazy val serializer: Serializer =
- BackendsApiManager.getSparkPlanExecApiInstance
- .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
-
- @transient override lazy val metrics: Map[String, SQLMetric] =
- BackendsApiManager.getMetricsApiInstance
- .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
- readMetrics ++ writeMetrics
-
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val childRDD = child.executeColumnar()
@@ -125,11 +136,26 @@ case class ColumnarCollectLimitExec(
if (childRDD.getNumPartitions == 1) childRDD
else shuffleLimitedPartitions(childRDD)
- processedRDD.mapPartitions(partition => collectLimitedRows(partition,
limit))
+ processedRDD.mapPartitions(
+ partition => {
+ if (limit > 0) {
+ val adjusted = math.max(0, limit - offset)
+ collectWithOffsetAndLimit(partition, offset, adjusted)
+ } else {
+ collectWithOffsetAndLimit(partition, offset, -1)
+ }
+ })
}
private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]):
RDD[ColumnarBatch] = {
- val locallyLimited = childRDD.mapPartitions(partition =>
collectLimitedRows(partition, limit))
+ val applyLocalLimit = (offset == 0 && limit >= 0)
+ val locallyLimited = if (applyLocalLimit) {
+ childRDD.mapPartitions {
+ collectWithOffsetAndLimit(_, 0, limit)
+ }
+ } else {
+ childRDD
+ }
new ShuffledColumnarBatchRDD(
BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
locallyLimited,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index a798053f6f..a899099032 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -710,7 +710,10 @@ trait SparkPlanExecApi {
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr,
limitExpr), original)
- def genColumnarCollectLimitExec(limit: Int, plan: SparkPlan):
ColumnarCollectLimitBaseExec
+ def genColumnarCollectLimitExec(
+ limit: Int,
+ plan: SparkPlan,
+ offset: Int): ColumnarCollectLimitBaseExec
def genColumnarRangeExec(
start: Long,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
index c68d0cd51c..fc551e64d6 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
@@ -16,27 +16,22 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.ValidationResult
-import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
SinglePartition}
-import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan}
+import org.apache.spark.sql.execution.{LimitExec, SparkPlan}
abstract class ColumnarCollectLimitBaseExec(
limit: Int,
- childPlan: SparkPlan
+ childPlan: SparkPlan,
+ offset: Int = 0
) extends LimitExec
with ValidatablePlan {
override def outputPartitioning: Partitioning = SinglePartition
override protected def doValidateInternal(): ValidationResult = {
- if (!SparkShimLoader.getSparkShims.isColumnarLimitExecSupported()) {
- return ValidationResult.failed(
- "Columnar collect-limit is unsupported under the current Spark
version")
- }
- ValidationResult.succeeded
+ ValidationResult.failed("Columnar shuffle not enabled or child does not
support columnar.")
}
override protected def doExecute()
@@ -45,12 +40,3 @@ abstract class ColumnarCollectLimitBaseExec(
}
}
-object ColumnarCollectLimitBaseExec {
- def from(collectLimitExec: CollectLimitExec): ColumnarCollectLimitBaseExec =
{
- BackendsApiManager.getSparkPlanExecApiInstance
- .genColumnarCollectLimitExec(
- collectLimitExec.limit,
- collectLimitExec.child
- )
- }
-}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectLimitTransformerRule.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectLimitTransformerRule.scala
new file mode 100644
index 0000000000..bb4761a07e
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectLimitTransformerRule.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlan}
+
+case class CollectLimitTransformerRule() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!GlutenConfig.get.enableColumnarCollectLimit) {
+ return plan
+ }
+
+ val transformed = plan.transformUp {
+ case exec: CollectLimitExec if exec.child.supportsColumnar =>
+ val offset = SparkShimLoader.getSparkShims.getCollectLimitOffset(exec)
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .genColumnarCollectLimitExec(exec.limit, exec.child, offset)
+ }
+
+ transformed
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 750fc060cd..2ad6701742 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -345,9 +345,11 @@ object OffloadOthers {
child)
case plan: CollectLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ val offset =
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
plan.limit,
- plan.child
+ plan.child,
+ offset
)
case plan: RDDScanExec if
RDDScanTransformer.isSupportRDDScanExec(plan) =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 66e2d3159e..60d81f106f 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.utils.velox
+import org.apache.gluten.execution.GlutenSQLCollectLimitExecSuite
import org.apache.gluten.utils.{BackendTestSettings, SQLQueryTestSettings}
import org.apache.spark.GlutenSortShuffleSuite
@@ -780,6 +781,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenConfigBehaviorSuite]
// Will be fixed by cleaning up ColumnarShuffleExchangeExec.
.exclude("SPARK-22160
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+ // Gluten columnar operator will have different number of jobs
+ .exclude("SPARK-40211: customize initialNumPartitions for take")
enableSuite[GlutenCountMinSketchAggQuerySuite]
enableSuite[GlutenCsvFunctionsSuite]
enableSuite[GlutenCTEHintSuite]
@@ -812,6 +815,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
+ enableSuite[GlutenSQLCollectLimitExecSuite]
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
similarity index 77%
copy from
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
copy to
gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
index 7ac89c33de..d54d1573a5 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -17,14 +17,11 @@
package org.apache.gluten.execution
import org.apache.spark.SparkConf
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
-class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite {
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
- override protected val resourcePath: String = "N/A"
- override protected val fileFormat: String = "N/A"
-
- override protected def sparkConf: SparkConf = {
+ override def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
}
@@ -56,7 +53,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assert(assertionCondition, assertionMessage)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - basic limit test",
"3.2", "3.3") {
+ test("ColumnarCollectLimitExec - basic limit test") {
val df = spark.range(0, 1000, 1).toDF("id").limit(5)
val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
@@ -65,7 +62,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter",
"3.2", "3.3") {
+ test("ColumnarCollectLimitExec - with filter") {
val df = spark
.range(0, 20, 1)
.toDF("id")
@@ -78,21 +75,20 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - range with
repartition", "3.2", "3.3") {
+ test("ColumnarCollectLimitExec - range with repartition") {
val df = spark
.range(0, 10, 1)
.toDF("id")
.repartition(3)
+ .orderBy("id")
.limit(3)
- val expectedData = Seq(Row(1L), Row(2L), Row(4L))
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
checkAnswer(df, expectedData)
-
- assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with distinct
values", "3.2", "3.3") {
+ test("ColumnarCollectLimitExec - with distinct values") {
val df = spark
.range(0, 10, 1)
.toDF("id")
@@ -106,7 +102,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit",
"3.2", "3.3") {
+ test("ColumnarCollectLimitExec - chained limit") {
val df = spark
.range(0, 10, 1)
.toDF("id")
@@ -119,7 +115,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - limit after
union", "3.2", "3.3") {
+ test("ColumnarCollectLimitExec - limit after union") {
val df1 = spark.range(0, 5).toDF("id")
val df2 = spark.range(5, 10).toDF("id")
val unionDf = df1.union(df2).limit(3)
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 8f0d562f95..676b562889 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -632,6 +632,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenConfigBehaviorSuite]
// Will be fixed by cleaning up ColumnarShuffleExchangeExec.
.exclude("SPARK-22160
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+ // Gluten columnar operator will have different number of jobs
+ .exclude("SPARK-40211: customize initialNumPartitions for take")
enableSuite[GlutenCountMinSketchAggQuerySuite]
enableSuite[GlutenCsvFunctionsSuite]
enableSuite[GlutenCTEHintSuite]
@@ -862,6 +864,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
+ enableSuite[GlutenSQLCollectLimitExecSuite]
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
similarity index 76%
rename from
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
rename to
gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
index 7ac89c33de..069dea32f4 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -14,17 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.spark.sql.execution
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
-class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite {
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
- override protected val resourcePath: String = "N/A"
- override protected val fileFormat: String = "N/A"
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
- override protected def sparkConf: SparkConf = {
+ override def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
}
@@ -56,7 +55,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assert(assertionCondition, assertionMessage)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - basic limit test",
"3.2", "3.3") {
+ test("ColumnarCollectLimitExec - basic limit test") {
val df = spark.range(0, 1000, 1).toDF("id").limit(5)
val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
@@ -65,7 +64,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter",
"3.2", "3.3") {
+ test("ColumnarCollectLimitExec - with filter") {
val df = spark
.range(0, 20, 1)
.toDF("id")
@@ -78,21 +77,20 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - range with
repartition", "3.2", "3.3") {
+ test("ColumnarCollectLimitExec - range with repartition") {
val df = spark
.range(0, 10, 1)
.toDF("id")
.repartition(3)
+ .orderBy("id")
.limit(3)
- val expectedData = Seq(Row(1L), Row(2L), Row(4L))
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
checkAnswer(df, expectedData)
-
- assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with distinct
values", "3.2", "3.3") {
+ test("ColumnarCollectLimitExec - with distinct values") {
val df = spark
.range(0, 10, 1)
.toDF("id")
@@ -106,7 +104,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit",
"3.2", "3.3") {
+ test("ColumnarCollectLimitExec - chained limit") {
val df = spark
.range(0, 10, 1)
.toDF("id")
@@ -119,7 +117,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - limit after
union", "3.2", "3.3") {
+ test("ColumnarCollectLimitExec - limit after union") {
val df1 = spark.range(0, 5).toDF("id")
val df2 = spark.range(5, 10).toDF("id")
val unionDf = df1.union(df2).limit(3)
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index d688cc1272..3f6ed85f87 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.utils.velox
+import org.apache.gluten.execution.GlutenSQLCollectLimitExecSuite
import org.apache.gluten.utils.{BackendTestSettings, SQLQueryTestSettings}
import org.apache.spark.GlutenSortShuffleSuite
@@ -185,7 +186,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[VeloxAdaptiveQueryExecSuite]
.includeAllGlutenTests()
.includeByPrefix(
- "SPARK-29906",
"SPARK-30291",
"SPARK-30403",
"SPARK-30719",
@@ -667,6 +667,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenConfigBehaviorSuite]
// Will be fixed by cleaning up ColumnarShuffleExchangeExec.
.exclude("SPARK-22160
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+ // Gluten columnar operator will have different number of jobs
+ .exclude("SPARK-40211: customize initialNumPartitions for take")
enableSuite[GlutenCountMinSketchAggQuerySuite]
enableSuite[GlutenCsvFunctionsSuite]
enableSuite[GlutenCTEHintSuite]
@@ -899,6 +901,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetRowIndexSuite]
.excludeByPrefix("row index generation")
.excludeByPrefix("invalid row index column type")
+ enableSuite[GlutenSQLCollectLimitExecSuite]
+
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
// scalastyle:on line.size.limit
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..11f773a2c6
--- /dev/null
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
+
+ override def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ }
+
+ private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+ df: DataFrame,
+ checkMatch: Boolean): Unit = {
+ val executedPlan = getExecutedPlan(df)
+
+ val operatorFound = executedPlan.exists {
+ plan =>
+ try {
+ implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+ } catch {
+ case _: Throwable => false
+ }
+ }
+
+ val assertionCondition = operatorFound == checkMatch
+ val assertionMessage =
+ if (checkMatch) {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+ s"in executed plan:\n $executedPlan"
+ } else {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+ s"in executed plan:\n $executedPlan"
+ }
+
+ assert(assertionCondition, assertionMessage)
+ }
+
+ test("ColumnarCollectLimitExec - basic limit test") {
+ val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - with filter") {
+ val df = spark
+ .range(0, 20, 1)
+ .toDF("id")
+ .filter("id % 2 == 0")
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - range with repartition") {
+
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .repartition(3)
+ .orderBy("id")
+ .limit(3)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(df, expectedData)
+ }
+
+ test("ColumnarCollectLimitExec - with distinct values") {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .select("id")
+ .distinct()
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - chained limit") {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .limit(8)
+ .limit(3)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - limit after union") {
+ val df1 = spark.range(0, 5).toDF("id")
+ val df2 = spark.range(5, 10).toDF("id")
+ val unionDf = df1.union(df2).limit(3)
+
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(unionDf, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
+ }
+
+ test("ColumnarCollectLimitExec - offset test") {
+ val df1 = spark.range(0, 10, 1).toDF("id").limit(5).offset(2)
+ val expectedData1 = Seq(Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df1, expectedData1)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch =
true)
+
+ val df2 = spark.range(0, 20, 1).toDF("id").limit(12).offset(5)
+ val expectedData2 = Seq(Row(5L), Row(6L), Row(7L), Row(8L), Row(9L),
Row(10L), Row(11L))
+
+ checkAnswer(df2, expectedData2)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch =
true)
+
+ val df3 = spark.range(0, 30, 1).toDF("id").limit(10).offset(3)
+ val expectedData3 = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L),
Row(8L), Row(9L))
+
+ checkAnswer(df3, expectedData3)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch =
true)
+
+ val df4 = spark.range(0, 15, 1).toDF("id").limit(8).offset(4)
+ val expectedData4 = Seq(Row(4L), Row(5L), Row(6L), Row(7L))
+
+ checkAnswer(df4, expectedData4)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch =
true)
+
+ val df5 = spark.range(0, 50, 1).toDF("id").limit(20).offset(10)
+ val expectedData5 = Seq(
+ Row(10L),
+ Row(11L),
+ Row(12L),
+ Row(13L),
+ Row(14L),
+ Row(15L),
+ Row(16L),
+ Row(17L),
+ Row(18L),
+ Row(19L))
+
+ checkAnswer(df5, expectedData5)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - pure offset test") {
+ val df1 = spark.range(0, 20, 1).toDF("id").offset(5)
+ val expectedData1 = Seq(
+ Row(5L),
+ Row(6L),
+ Row(7L),
+ Row(8L),
+ Row(9L),
+ Row(10L),
+ Row(11L),
+ Row(12L),
+ Row(13L),
+ Row(14L),
+ Row(15L),
+ Row(16L),
+ Row(17L),
+ Row(18L),
+ Row(19L))
+
+ checkAnswer(df1, expectedData1)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch =
true)
+
+ val df2 = spark.range(0, 50, 1).toDF("id").offset(10)
+ val expectedData2 = (10L to 49L).map(Row(_))
+
+ checkAnswer(df2, expectedData2)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch =
true)
+
+ val df3 = spark.range(0, 100, 2).toDF("id").offset(15)
+ val expectedData3 = (30L to 98L by 2).map(Row(_))
+
+ checkAnswer(df3, expectedData3)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch =
true)
+
+ val df4 = spark.range(0, 30, 1).toDF("id").offset(20)
+ val expectedData4 = (20L to 29L).map(Row(_))
+
+ checkAnswer(df4, expectedData4)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch =
true)
+
+ val df5 = spark.range(0, 200, 5).toDF("id").offset(10)
+ val expectedData5 = (50L to 195L by 5).map(Row(_))
+
+ checkAnswer(df5, expectedData5)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch =
true)
+
+ val df6 = spark.range(0, 5, 1).toDF("id").limit(10)
+ val expectedData6 = (0L to 4L).map(Row(_))
+
+ checkAnswer(df6, expectedData6)
+ }
+
+ test("ColumnarCollectLimitExec - offset with filter") {
+ val df = spark.range(0, 10, 1).toDF("id").filter("id % 2 ==
0").limit(5).offset(2)
+ val expectedData = Seq(Row(4L), Row(6L), Row(8L))
+
+ checkAnswer(df, expectedData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - offset after union") {
+ val df1 = spark.range(0, 5).toDF("id")
+ val df2 = spark.range(5, 10).toDF("id")
+ val unionDf = df1.union(df2).limit(6).offset(3)
+
+ val expectedData = Seq(Row(3L), Row(4L), Row(5L))
+ checkAnswer(unionDf, expectedData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
+ }
+
+ test("ColumnarCollectLimitExec - single partition with limit, offset, and
limit + offset") {
+
+ val singlePartitionDF = spark.range(0, 10, 1).toDF("id").coalesce(1)
+
+ val limitDF = singlePartitionDF.limit(5)
+ val expectedLimitData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+ checkAnswer(limitDF, expectedLimitData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitDF,
checkMatch = true)
+
+ val offsetDF = singlePartitionDF.offset(3)
+ val expectedOffsetData = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L),
Row(8L), Row(9L))
+ checkAnswer(offsetDF, expectedOffsetData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](offsetDF,
checkMatch = true)
+
+ val limitOffsetDF = singlePartitionDF.limit(5).offset(2)
+ val expectedLimitOffsetData = Seq(Row(2L), Row(3L), Row(4L))
+ checkAnswer(limitOffsetDF, expectedLimitOffsetData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitOffsetDF,
checkMatch = true)
+ }
+
+}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a069724fa9..c84ae5803e 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -188,7 +188,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[VeloxAdaptiveQueryExecSuite]
.includeAllGlutenTests()
.includeByPrefix(
- "SPARK-29906",
"SPARK-30291",
"SPARK-30403",
"SPARK-30719",
@@ -687,6 +686,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenConfigBehaviorSuite]
// Will be fixed by cleaning up ColumnarShuffleExchangeExec.
.exclude("SPARK-22160
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+ // Gluten columnar operator will have different number of jobs
+ .exclude("SPARK-40211: customize initialNumPartitions for take")
enableSuite[GlutenCountMinSketchAggQuerySuite]
enableSuite[GlutenCsvFunctionsSuite]
enableSuite[GlutenCTEHintSuite]
@@ -936,6 +937,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetFileMetadataStructRowIndexSuite]
enableSuite[GlutenTableLocationSuite]
enableSuite[GlutenRemoveRedundantWindowGroupLimitsSuite]
+ enableSuite[GlutenSQLCollectLimitExecSuite]
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..a946843d3f
--- /dev/null
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
+
+ override def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ }
+
+ private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+ df: DataFrame,
+ checkMatch: Boolean): Unit = {
+ val executedPlan = getExecutedPlan(df)
+
+ val operatorFound = executedPlan.exists {
+ plan =>
+ try {
+ implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+ } catch {
+ case _: Throwable => false
+ }
+ }
+
+ val assertionCondition = operatorFound == checkMatch
+ val assertionMessage =
+ if (checkMatch) {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+ s"in executed plan:\n $executedPlan"
+ } else {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+ s"in executed plan:\n $executedPlan"
+ }
+
+ assert(assertionCondition, assertionMessage)
+ }
+
+ test("ColumnarCollectLimitExec - basic limit test") {
+ val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - with filter") {
+ val df = spark
+ .range(0, 20, 1)
+ .toDF("id")
+ .filter("id % 2 == 0")
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - range with repartition") {
+
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .repartition(3)
+ .orderBy("id")
+ .limit(3)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(df, expectedData)
+ }
+
+ test("ColumnarCollectLimitExec - with distinct values") {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .select("id")
+ .distinct()
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - chained limit") {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .limit(8)
+ .limit(3)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - limit after union") {
+ val df1 = spark.range(0, 5).toDF("id")
+ val df2 = spark.range(5, 10).toDF("id")
+ val unionDf = df1.union(df2).limit(3)
+
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(unionDf, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
+ }
+
+ test("ColumnarCollectLimitExec - offset test") {
+ val df1 = spark.range(0, 10, 1).toDF("id").limit(5).offset(2)
+ val expectedData1 = Seq(Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df1, expectedData1)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch =
true)
+
+ val df2 = spark.range(0, 20, 1).toDF("id").limit(12).offset(5)
+ val expectedData2 = Seq(Row(5L), Row(6L), Row(7L), Row(8L), Row(9L),
Row(10L), Row(11L))
+
+ checkAnswer(df2, expectedData2)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch =
true)
+
+ val df3 = spark.range(0, 30, 1).toDF("id").limit(10).offset(3)
+ val expectedData3 = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L),
Row(8L), Row(9L))
+
+ checkAnswer(df3, expectedData3)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch =
true)
+
+ val df4 = spark.range(0, 15, 1).toDF("id").limit(8).offset(4)
+ val expectedData4 = Seq(Row(4L), Row(5L), Row(6L), Row(7L))
+
+ checkAnswer(df4, expectedData4)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch =
true)
+
+ val df5 = spark.range(0, 50, 1).toDF("id").limit(20).offset(10)
+ val expectedData5 = Seq(
+ Row(10L),
+ Row(11L),
+ Row(12L),
+ Row(13L),
+ Row(14L),
+ Row(15L),
+ Row(16L),
+ Row(17L),
+ Row(18L),
+ Row(19L))
+
+ checkAnswer(df5, expectedData5)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - pure offset test") {
+ val df1 = spark.range(0, 20, 1).toDF("id").offset(5)
+ val expectedData1 = Seq(
+ Row(5L),
+ Row(6L),
+ Row(7L),
+ Row(8L),
+ Row(9L),
+ Row(10L),
+ Row(11L),
+ Row(12L),
+ Row(13L),
+ Row(14L),
+ Row(15L),
+ Row(16L),
+ Row(17L),
+ Row(18L),
+ Row(19L))
+
+ checkAnswer(df1, expectedData1)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df1, checkMatch =
true)
+
+ val df2 = spark.range(0, 50, 1).toDF("id").offset(10)
+ val expectedData2 = (10L to 49L).map(Row(_))
+
+ checkAnswer(df2, expectedData2)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df2, checkMatch =
true)
+
+ val df3 = spark.range(0, 100, 2).toDF("id").offset(15)
+ val expectedData3 = (30L to 98L by 2).map(Row(_))
+
+ checkAnswer(df3, expectedData3)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df3, checkMatch =
true)
+
+ val df4 = spark.range(0, 30, 1).toDF("id").offset(20)
+ val expectedData4 = (20L to 29L).map(Row(_))
+
+ checkAnswer(df4, expectedData4)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df4, checkMatch =
true)
+
+ val df5 = spark.range(0, 200, 5).toDF("id").offset(10)
+ val expectedData5 = (50L to 195L by 5).map(Row(_))
+
+ checkAnswer(df5, expectedData5)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df5, checkMatch =
true)
+
+ val df6 = spark.range(0, 5, 1).toDF("id").limit(10)
+ val expectedData6 = (0L to 4L).map(Row(_))
+
+ checkAnswer(df6, expectedData6)
+ }
+
+ test("ColumnarCollectLimitExec - offset with filter") {
+ val df = spark.range(0, 10, 1).toDF("id").filter("id % 2 ==
0").limit(5).offset(2)
+ val expectedData = Seq(Row(4L), Row(6L), Row(8L))
+
+ checkAnswer(df, expectedData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - offset after union") {
+ val df1 = spark.range(0, 5).toDF("id")
+ val df2 = spark.range(5, 10).toDF("id")
+ val unionDf = df1.union(df2).limit(6).offset(3)
+
+ val expectedData = Seq(Row(3L), Row(4L), Row(5L))
+ checkAnswer(unionDf, expectedData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
+ }
+
+ test("ColumnarCollectLimitExec - single partition with limit, offset, and
limit + offset") {
+
+ val singlePartitionDF = spark.range(0, 10, 1).toDF("id").coalesce(1)
+
+ val limitDF = singlePartitionDF.limit(5)
+ val expectedLimitData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+ checkAnswer(limitDF, expectedLimitData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitDF,
checkMatch = true)
+
+ val offsetDF = singlePartitionDF.offset(3)
+ val expectedOffsetData = Seq(Row(3L), Row(4L), Row(5L), Row(6L), Row(7L),
Row(8L), Row(9L))
+ checkAnswer(offsetDF, expectedOffsetData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](offsetDF,
checkMatch = true)
+
+ val limitOffsetDF = singlePartitionDF.limit(5).offset(2)
+ val expectedLimitOffsetData = Seq(Row(2L), Row(3L), Row(4L))
+ checkAnswer(limitOffsetDF, expectedLimitOffsetData)
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](limitOffsetDF,
checkMatch = true)
+ }
+
+}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 0149e2ea55..537385fa3d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
-import org.apache.spark.sql.execution.{FileSourceScanExec, GlobalLimitExec,
SparkPlan, TakeOrderedAndProjectExec}
+import org.apache.spark.sql.execution.{CollectLimitExec, FileSourceScanExec,
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -316,4 +316,6 @@ trait SparkShims {
def getOtherConstantMetadataColumnValues(file: PartitionedFile):
JMap[String, Object] =
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
+
+ def getCollectLimitOffset(plan: CollectLimitExec): Int = 0
}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 8ff5e17873..f43d212274 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -628,6 +628,10 @@ class Spark34Shims extends SparkShims {
}
}
- override def isColumnarLimitExecSupported(): Boolean = false
+ override def isColumnarLimitExecSupported(): Boolean = true
+
+ override def getCollectLimitOffset(plan: CollectLimitExec): Int = {
+ plan.offset
+ }
}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index 43a3239a2a..88df86a35e 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -684,8 +684,12 @@ class Spark35Shims extends SparkShims {
}
}
- override def isColumnarLimitExecSupported(): Boolean = false
+ override def isColumnarLimitExecSupported(): Boolean = true
override def getOtherConstantMetadataColumnValues(file: PartitionedFile):
JMap[String, Object] =
file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String,
Object]]
+
+ override def getCollectLimitOffset(plan: CollectLimitExec): Int = {
+ plan.offset
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]