This is an automated email from the ASF dual-hosted git repository.
exmy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2f13452955 [GLUTEN-9137][CH] Support CollectLimit for CH backend
(#9139)
2f13452955 is described below
commit 2f13452955474ee2320b7d902d5df36bd552f18a
Author: exmy <[email protected]>
AuthorDate: Fri Apr 25 09:24:24 2025 +0800
[GLUTEN-9137][CH] Support CollectLimit for CH backend (#9139)
[GLUTEN-9137][CH] Support CollectLimit for CH backend
---
.../apache/gluten/vectorized/CHNativeBlock.java | 16 +++
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 43 +++----
.../clickhouse/CHSparkPlanExecApi.scala | 2 +-
.../execution/CHColumnarCollectLimitExec.scala | 95 ++++++++++++++
.../extension/BasicExpressionRewriteRule.scala | 4 +-
.../extension/CoalesceAggregationUnion.scala | 4 +-
.../CommonSubexpressionEliminateRule.scala | 4 +-
.../gluten/extension/ExtendedColumnPruning.scala | 2 +-
.../RewriteDateTimestampComparisonRule.scala | 2 +-
.../extension/RewriteToDateExpresstionRule.scala | 4 +-
.../GlutenClickHouseCollectLimitExecSuite.scala | 139 +++++++++++++++++++++
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 -
.../execution/ColumnarCollectLimitExec.scala | 86 +------------
cpp-ch/local-engine/local_engine_jni.cpp | 13 +-
.../gluten/backendsapi/BackendSettingsApi.scala | 2 -
.../execution/ColumnarCollectLimitBaseExec.scala | 87 ++++++++++++-
.../extension/columnar/validator/Validators.scala | 1 -
.../utils/clickhouse/ClickHouseTestSettings.scala | 2 +
.../utils/clickhouse/ClickHouseTestSettings.scala | 4 +-
19 files changed, 392 insertions(+), 120 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
index 8cf365335c..ffc8b70402 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeBlock.java
@@ -83,6 +83,12 @@ public class CHNativeBlock {
return new CHNativeBlock(copyBlock(blockAddress)).toColumnarBatch();
}
+ private native long nativeSlice(long blockAddress, int offset, int limit);
+
+ public long nativeSlice(int offset, int limit) {
+ return nativeSlice(blockAddress, offset, limit);
+ }
+
public void close() {
if (blockAddress != 0) {
nativeClose(blockAddress);
@@ -111,4 +117,14 @@ public class CHNativeBlock {
}
return new ColumnarBatch(vectors, numRows);
}
+
+ public static ColumnarBatch slice(ColumnarBatch batch, int offset, int
limit) {
+ if (offset + limit > batch.numRows()) {
+ throw new GlutenException(
+ "Parameter out of bound in slice function, offset: " + offset + ",
limit: " + limit);
+ }
+ CHNativeBlock block = CHNativeBlock.fromColumnarBatch(batch);
+ long blockAddress = block.nativeSlice(offset, limit);
+ return new CHNativeBlock(blockAddress).toColumnarBatch();
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 411351e9a4..8a4fde534f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -63,16 +63,16 @@ object CHRuleApi {
injector.injectResolutionRule(spark => new
JoinAggregateToAggregateUnion(spark))
// CoalesceAggregationUnion and CoalesceProjectionUnion should follows
// JoinAggregateToAggregateUnion
- injector.injectResolutionRule(spark => new CoalesceAggregationUnion(spark))
- injector.injectResolutionRule(spark => new CoalesceProjectionUnion(spark))
- injector.injectResolutionRule(spark => new
RewriteToDateExpresstionRule(spark))
- injector.injectResolutionRule(spark => new
RewriteDateTimestampComparisonRule(spark))
- injector.injectResolutionRule(spark => new
CollapseGetJsonObjectExpressionRule(spark))
- injector.injectResolutionRule(spark => new
RepalceFromJsonWithGetJsonObject(spark))
- injector.injectOptimizerRule(spark => new
CommonSubexpressionEliminateRule(spark))
- injector.injectOptimizerRule(spark => new AggregateIfToFilterRule(spark))
- injector.injectOptimizerRule(spark => new SimplifySumRule(spark))
- injector.injectOptimizerRule(spark => new ExtendedColumnPruning(spark))
+ injector.injectResolutionRule(spark => CoalesceAggregationUnion(spark))
+ injector.injectResolutionRule(spark => CoalesceProjectionUnion(spark))
+ injector.injectResolutionRule(spark => RewriteToDateExpresstionRule(spark))
+ injector.injectResolutionRule(spark =>
RewriteDateTimestampComparisonRule(spark))
+ injector.injectResolutionRule(spark =>
CollapseGetJsonObjectExpressionRule(spark))
+ injector.injectResolutionRule(spark =>
RepalceFromJsonWithGetJsonObject(spark))
+ injector.injectOptimizerRule(spark =>
CommonSubexpressionEliminateRule(spark))
+ injector.injectOptimizerRule(spark => AggregateIfToFilterRule(spark))
+ injector.injectOptimizerRule(spark => SimplifySumRule(spark))
+ injector.injectOptimizerRule(spark => ExtendedColumnPruning(spark))
injector.injectOptimizerRule(spark =>
CHAggregateFunctionRewriteRule(spark))
injector.injectOptimizerRule(_ => CountDistinctWithoutExpand)
injector.injectOptimizerRule(_ => EqualToRewrite)
@@ -83,11 +83,11 @@ object CHRuleApi {
// Legacy: Pre-transform rules.
injector.injectPreTransform(_ => RemoveTransitions)
injector.injectPreTransform(_ => PushDownInputFileExpression.PreOffload)
- injector.injectPreTransform(c => FallbackOnANSIMode.apply(c.session))
- injector.injectPreTransform(c => FallbackMultiCodegens.apply(c.session))
+ injector.injectPreTransform(c => FallbackOnANSIMode(c.session))
+ injector.injectPreTransform(c => FallbackMultiCodegens(c.session))
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
- injector.injectPreTransform(c =>
FallbackBroadcastHashJoin.apply(c.session))
- injector.injectPreTransform(c =>
MergeTwoPhasesHashBaseAggregate.apply(c.session))
+ injector.injectPreTransform(c => FallbackBroadcastHashJoin(c.session))
+ injector.injectPreTransform(c =>
MergeTwoPhasesHashBaseAggregate(c.session))
injector.injectPreTransform(_ => WriteFilesWithBucketValue)
// Legacy: The legacy transform rule.
@@ -114,21 +114,22 @@ object CHRuleApi {
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
injector.injectPostTransform(_ => EliminateLocalSort)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
- injector.injectPostTransform(c =>
RewriteSortMergeJoinToHashJoinRule.apply(c.session))
- injector.injectPostTransform(c =>
PushdownAggregatePreProjectionAheadExpand.apply(c.session))
- injector.injectPostTransform(c => LazyAggregateExpandRule.apply(c.session))
+ injector.injectPostTransform(c =>
RewriteSortMergeJoinToHashJoinRule(c.session))
+ injector.injectPostTransform(c =>
PushdownAggregatePreProjectionAheadExpand(c.session))
+ injector.injectPostTransform(c => LazyAggregateExpandRule(c.session))
injector.injectPostTransform(c =>
ConverRowNumbertWindowToAggregateRule(c.session))
injector.injectPostTransform(
c =>
intercept(
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
c.session)))
+ injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, CHBatch))
- injector.injectPostTransform(c => RemoveDuplicatedColumns.apply(c.session))
- injector.injectPostTransform(c =>
AddPreProjectionForHashJoin.apply(c.session))
- injector.injectPostTransform(c =>
ReplaceSubStringComparison.apply(c.session))
+ injector.injectPostTransform(c => RemoveDuplicatedColumns(c.session))
+ injector.injectPostTransform(c => AddPreProjectionForHashJoin(c.session))
+ injector.injectPostTransform(c => ReplaceSubStringComparison(c.session))
injector.injectPostTransform(c =>
EliminateDeduplicateAggregateWithAnyJoin(c.session))
- injector.injectPostTransform(c =>
FlattenNestedExpressions.apply(c.session))
+ injector.injectPostTransform(c => FlattenNestedExpressions(c.session))
// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(c => p =>
ExpandFallbackPolicy(c.caller.isAqe(), p))
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 217c4d08d8..ea992d4beb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -946,7 +946,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
limit: Int,
child: SparkPlan,
offset: Int): ColumnarCollectLimitBaseExec =
- throw new GlutenNotSupportException("ColumnarCollectLimit is not supported
in ch backend.")
+ CHColumnarCollectLimitExec(limit, offset, child)
override def genColumnarRangeExec(
start: Long,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectLimitExec.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectLimitExec.scala
new file mode 100644
index 0000000000..2450babe14
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectLimitExec.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.CHNativeBlock
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+case class CHColumnarCollectLimitExec(limit: Int, offset: Int, child:
SparkPlan)
+ extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ /**
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
+ * Either retain the entire batch if it fits within the remaining limit, or
prune it if it
+ * partially exceeds the remaining limit/offset.
+ */
+ override def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+ val unlimited = limit < 0
+ var rowsToSkip = math.max(offset, 0)
+ var rowsToCollect = if (unlimited) Int.MaxValue else limit
+
+ new Iterator[ColumnarBatch] {
+ private var nextBatch: Option[ColumnarBatch] = None
+
+ override def hasNext: Boolean = {
+ nextBatch.isDefined || fetchNextBatch()
+ }
+
+ override def next(): ColumnarBatch = {
+ if (!hasNext) throw new NoSuchElementException("No more batches
available.")
+ val batch = nextBatch.get
+ nextBatch = None
+ batch
+ }
+
+ /**
+ * Advance the iterator until we find a batch (possibly sliced) that we
can return, or exhaust
+ * the input.
+ */
+ private def fetchNextBatch(): Boolean = {
+
+ if (rowsToCollect <= 0) return false
+
+ while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+ rowsToSkip -= batchSize
+ } else {
+ val startIndex = rowsToSkip
+ val leftoverAfterSkip = batchSize - startIndex
+ rowsToSkip = 0
+
+ val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+ val prunedBatch =
+ if (startIndex == 0 && needed == batchSize) {
+ batch
+ } else {
+ CHNativeBlock.slice(batch, startIndex, needed)
+ }
+
+ rowsToCollect -= needed
+ nextBatch = Some(prunedBatch)
+ return true
+ }
+ }
+ false
+ }
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
index 200a460c3d..ffabfe39a5 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/BasicExpressionRewriteRule.scala
@@ -44,7 +44,9 @@ object PlanResolvedChecker {
}
}
-class RepalceFromJsonWithGetJsonObject(spark: SparkSession) extends
Rule[LogicalPlan] with Logging {
+case class RepalceFromJsonWithGetJsonObject(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (
!CHBackendSettings.enableReplaceFromJsonWithGetJsonObject ||
!PlanResolvedChecker.check(plan)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
index cfae1deeaa..05e522ae82 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CoalesceAggregationUnion.scala
@@ -336,7 +336,7 @@ object CoalesceUnionUtil extends Logging {
* really slow. The rewritten query only scan `t` once, and the performance is
much better.
*/
-class CoalesceAggregationUnion(spark: SparkSession) extends Rule[LogicalPlan]
with Logging {
+case class CoalesceAggregationUnion(spark: SparkSession) extends
Rule[LogicalPlan] with Logging {
case class PlanAnalyzer(originalAggregate: Aggregate) extends
AbstractPlanAnalyzer {
@@ -825,7 +825,7 @@ class CoalesceAggregationUnion(spark: SparkSession) extends
Rule[LogicalPlan] wi
* select array(if(d=1, named_struct('f0', a, 'f1', b, 'f2', 1), null),
if(d=2, named_struct('f0',
* a, 'f1', b, 'f2', 2), null)) as s from t where d = 1 or d = 2 ) ) where s
is not null
*/
-class CoalesceProjectionUnion(spark: SparkSession) extends Rule[LogicalPlan]
with Logging {
+case class CoalesceProjectionUnion(spark: SparkSession) extends
Rule[LogicalPlan] with Logging {
case class PlanAnalyzer(originalPlan: LogicalPlan) extends
AbstractPlanAnalyzer {
def extractFilter(): Option[Filter] = {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
index 7e674367f1..cefee23dbf 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/CommonSubexpressionEliminateRule.scala
@@ -32,7 +32,9 @@ import scala.collection.mutable
// 2. append two options to spark config
// --conf spark.sql.planChangeLog.level=error
// --conf spark.sql.planChangeLog.batches=all
-class CommonSubexpressionEliminateRule(spark: SparkSession) extends
Rule[LogicalPlan] with Logging {
+case class CommonSubexpressionEliminateRule(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with Logging {
private var lastPlan: LogicalPlan = null
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
index 53b22f217f..6515ab99fb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/ExtendedColumnPruning.scala
@@ -356,7 +356,7 @@ object ExtendedGeneratorNestedColumnAliasing {
// ExtendedColumnPruning process Project(Filter(Generate)),
// which is ignored by vanilla spark in optimization rule: ColumnPruning
-class ExtendedColumnPruning(spark: SparkSession) extends Rule[LogicalPlan]
with Logging {
+case class ExtendedColumnPruning(spark: SparkSession) extends
Rule[LogicalPlan] with Logging {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformWithPruning(AlwaysProcess.fn) {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
index 6873c06348..68c9eb4dbd 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteDateTimestampComparisonRule.scala
@@ -36,7 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String
// This rule try to make the filter condition into integer comparison, which
is more efficient.
// The above example will be rewritten into
// select * from table where to_unixtime('2023-11-02', 'yyyy-MM-dd') >=
unix_timestamp
-class RewriteDateTimestampComparisonRule(spark: SparkSession)
+case class RewriteDateTimestampComparisonRule(spark: SparkSession)
extends Rule[LogicalPlan]
with Logging {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
index a4eb7464de..4495cd8db2 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/RewriteToDateExpresstionRule.scala
@@ -38,7 +38,9 @@ import org.apache.spark.sql.types._
// Under ch backend, the StringType can be directly converted into DateType,
// and the functions `from_unixtime` and `unix_timestamp` can be optimized
here.
// Optimized result is `to_date(stringType)`
-class RewriteToDateExpresstionRule(spark: SparkSession) extends
Rule[LogicalPlan] with Logging {
+case class RewriteToDateExpresstionRule(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with Logging {
override def apply(plan: LogicalPlan): LogicalPlan = {
if (
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectLimitExecSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..729504b56b
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectLimitExecSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+
+class GlutenClickHouseCollectLimitExecSuite extends
GlutenClickHouseWholeStageTransformerSuite {
+
+ protected val tablesPath: String = basePath + "/tpch-data"
+ protected val tpchQueries: String =
+ rootPath +
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries"
+ protected val queriesResults: String = rootPath + "queries-output"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ }
+
+ private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+ df: DataFrame,
+ checkMatch: Boolean): Unit = {
+ val executedPlan = getExecutedPlan(df)
+
+ val operatorFound = executedPlan.exists {
+ plan =>
+ try {
+ implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+ } catch {
+ case _: Throwable => false
+ }
+ }
+
+ val assertionCondition = operatorFound == checkMatch
+ val assertionMessage =
+ if (checkMatch) {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+ s"in executed plan:\n $executedPlan"
+ } else {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+ s"in executed plan:\n $executedPlan"
+ }
+
+ assert(assertionCondition, assertionMessage)
+ }
+
+ test("ColumnarCollectLimitExec - basic limit test") {
+ val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - with filter") {
+ val df = spark
+ .range(0, 20, 1)
+ .toDF("id")
+ .filter("id % 2 == 0")
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - range with repartition") {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .repartition(3)
+ .limit(3)
+ val expectedData = Seq(Row(0L), Row(3L), Row(6L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - with distinct values") {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .select("id")
+ .distinct()
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(4L), Row(3L), Row(2L), Row(5L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - chained limit") {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .limit(8)
+ .limit(3)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ test("ColumnarCollectLimitExec - limit after union") {
+ val df1 = spark.range(0, 5).toDF("id")
+ val df2 = spark.range(5, 10).toDF("id")
+ val unionDf = df1.union(df2).limit(3)
+
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(unionDf, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
+ }
+
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index b194e2183e..56e5a07239 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -558,8 +558,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def needPreComputeRangeFrameBoundary(): Boolean = true
- override def supportCollectLimitExec(): Boolean = true
-
override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
override def supportIcebergEqualityDeleteRead(): Boolean = false
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
index 6fbbbfab66..61c42715ea 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
@@ -16,18 +16,10 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
-import org.apache.gluten.extension.columnar.transition.Convention
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
-import org.apache.spark.sql.execution.{ShuffledColumnarBatchRDD, SparkPlan}
-import org.apache.spark.sql.execution.metric.{SQLMetric,
SQLShuffleWriteMetricsReporter}
-import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
@@ -36,34 +28,12 @@ case class ColumnarCollectLimitExec(
offset: Int = 0
) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
- override def batchType(): Convention.BatchType =
- BackendsApiManager.getSettings.primaryBatchType
-
- private lazy val writeMetrics =
- SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
-
- private lazy val readMetrics =
-
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
-
- private lazy val useSortBasedShuffle: Boolean =
- BackendsApiManager.getSparkPlanExecApiInstance
- .useSortBasedShuffle(outputPartitioning, child.output)
-
- @transient private lazy val serializer: Serializer =
- BackendsApiManager.getSparkPlanExecApiInstance
- .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
-
- @transient override lazy val metrics: Map[String, SQLMetric] =
- BackendsApiManager.getMetricsApiInstance
- .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
- readMetrics ++ writeMetrics
-
/**
* Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
* partially exceeds the remaining limit/offset.
*/
- private def collectWithOffsetAndLimit(
+ override def collectWithOffsetAndLimit(
inputIter: Iterator[ColumnarBatch],
offset: Int,
limit: Int): Iterator[ColumnarBatch] = {
@@ -125,56 +95,6 @@ case class ColumnarCollectLimitExec(
}
}
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val childRDD = child.executeColumnar()
-
- if (childRDD.getNumPartitions == 0) {
- return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
- }
-
- val processedRDD =
- if (childRDD.getNumPartitions == 1) childRDD
- else shuffleLimitedPartitions(childRDD)
-
- processedRDD.mapPartitions(
- partition => {
- if (limit > 0) {
- val adjusted = math.max(0, limit - offset)
- collectWithOffsetAndLimit(partition, offset, adjusted)
- } else {
- collectWithOffsetAndLimit(partition, offset, -1)
- }
- })
- }
-
- private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]):
RDD[ColumnarBatch] = {
- val applyLocalLimit = (offset == 0 && limit >= 0)
- val locallyLimited = if (applyLocalLimit) {
- childRDD.mapPartitions {
- collectWithOffsetAndLimit(_, 0, limit)
- }
- } else {
- childRDD
- }
- new ShuffledColumnarBatchRDD(
- BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
- locallyLimited,
- child.output,
- child.output,
- SinglePartition,
- serializer,
- writeMetrics,
- metrics,
- useSortBasedShuffle
- ),
- readMetrics
- )
- }
-
- override def rowType0(): Convention.RowType = Convention.RowType.None
-
- override def output: Seq[Attribute] = child.output
-
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
}
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 8092c7b878..b9497aa920 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -543,11 +543,22 @@
Java_org_apache_gluten_vectorized_CHNativeBlock_copyBlock(JNIEnv * env, jobject
DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
auto copied_block = block->cloneWithColumns(block->getColumns());
- auto a = new DB::Block(copied_block);
+ auto * a = new DB::Block(std::move(copied_block));
return reinterpret_cast<jlong>(a);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
+JNIEXPORT jlong
+Java_org_apache_gluten_vectorized_CHNativeBlock_nativeSlice(JNIEnv * env,
jobject /* obj */, jlong block_address, jint offset, jint limit)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ DB::Block * block = reinterpret_cast<DB::Block *>(block_address);
+ DB::Block cut_block = block->cloneWithCutColumns(offset, limit);
+
+ return reinterpret_cast<jlong>(new DB::Block(std::move(cut_block)));
+ LOCAL_ENGINE_JNI_METHOD_END(env, -1)
+}
+
JNIEXPORT jlong
Java_org_apache_gluten_vectorized_CHStreamReader_createNativeShuffleReader(
JNIEnv * env, jclass /*clazz*/, jobject input_stream, jboolean compressed,
jlong max_shuffle_read_rows, jlong max_shuffle_read_bytes)
{
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index c9a3fdbc6d..de390b256c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,8 +152,6 @@ trait BackendSettingsApi {
def needPreComputeRangeFrameBoundary(): Boolean = false
- def supportCollectLimitExec(): Boolean = false
-
def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
def supportIcebergEqualityDeleteRead(): Boolean = true
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
index fc551e64d6..50ce2bc208 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
@@ -16,10 +16,18 @@
*/
package org.apache.gluten.execution
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
SinglePartition}
-import org.apache.spark.sql.execution.{LimitExec, SparkPlan}
+import org.apache.spark.sql.execution.{LimitExec, ShuffledColumnarBatchRDD,
SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric,
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
abstract class ColumnarCollectLimitBaseExec(
limit: Int,
@@ -28,6 +36,83 @@ abstract class ColumnarCollectLimitBaseExec(
) extends LimitExec
with ValidatablePlan {
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
+ override def batchType(): Convention.BatchType =
+ BackendsApiManager.getSettings.primaryBatchType
+
+ private lazy val writeMetrics =
+ SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+ BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val childRDD = child.executeColumnar()
+
+ if (childRDD.getNumPartitions == 0) {
+ return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
+ }
+
+ val processedRDD =
+ if (childRDD.getNumPartitions == 1) childRDD
+ else shuffleLimitedPartitions(childRDD)
+
+ processedRDD.mapPartitions(
+ partition => {
+ if (limit > 0) {
+ val adjusted = math.max(0, limit - offset)
+ collectWithOffsetAndLimit(partition, offset, adjusted)
+ } else {
+ collectWithOffsetAndLimit(partition, offset, -1)
+ }
+ })
+ }
+
+ private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]):
RDD[ColumnarBatch] = {
+ val applyLocalLimit = (offset == 0 && limit >= 0)
+ val locallyLimited = if (applyLocalLimit) {
+ childRDD.mapPartitions {
+ collectWithOffsetAndLimit(_, 0, limit)
+ }
+ } else {
+ childRDD
+ }
+ new ShuffledColumnarBatchRDD(
+ BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
+ locallyLimited,
+ child.output,
+ child.output,
+ SinglePartition,
+ serializer,
+ writeMetrics,
+ metrics,
+ useSortBasedShuffle
+ ),
+ readMetrics
+ )
+ }
+
+ protected def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch]
+
+ override def output: Seq[Attribute] = child.output
+
override def outputPartitioning: Partitioning = SinglePartition
override protected def doValidateInternal(): ValidationResult = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 57a60af3b8..36126cf4c7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,7 +136,6 @@ object Validators {
fail(p)
case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
- case p: CollectLimitExec if !settings.supportCollectLimitExec() =>
fail(p)
case _ => pass()
}
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 0e4666c604..2d9444e7a6 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -1158,6 +1158,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
"- single join")
.exclude("SPARK-35455: Unify empty relation optimization between normal
and AQE optimizer " +
"- multi join")
+ // Gluten columnar operator will have different number of shuffle
+ .exclude("SPARK-29906: AQE should not introduce extra shuffle for
outermost limit")
.excludeGlutenTest("Empty stage coalesced to 1-partition RDD")
.excludeGlutenTest(
"Avoid changing merge join to broadcast join if too many empty
partitions on build plan")
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
index 0caa3008e1..ff01c5452d 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala
@@ -49,7 +49,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[ClickHouseAdaptiveQueryExecSuite]
.includeAllGlutenTests()
.includeByPrefix(
- "SPARK-29906",
+ // exclude SPARK-29906 because gluten columnar operator will have
different number of shuffle
"SPARK-30291",
"SPARK-30403",
"SPARK-30719",
@@ -434,6 +434,8 @@ class ClickHouseTestSettings extends BackendTestSettings {
enableSuite[GlutenConfigBehaviorSuite]
// Will be fixed by cleaning up ColumnarShuffleExchangeExec.
.exclude("SPARK-22160
spark.sql.execution.rangeExchange.sampleSizePerPartition")
+ // Gluten columnar operator will have different number of jobs
+ .exclude("SPARK-40211: customize initialNumPartitions for take")
enableSuite[GlutenCountMinSketchAggQuerySuite]
enableSuite[GlutenCreateTableAsSelectSuite]
.exclude("CREATE TABLE USING AS SELECT based on the file without write
permission")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]