This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 69ffe2ccf3 [GLUTEN-9492][CH] Add Support for CollectTail operator
(#9476)
69ffe2ccf3 is described below
commit 69ffe2ccf3460e547d37d98360895327f6d1d447
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu May 8 12:14:53 2025 +0530
[GLUTEN-9492][CH] Add Support for CollectTail operator (#9476)
---
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 1 +
.../clickhouse/CHSparkPlanExecApi.scala | 2 +-
.../execution/CHColumnarCollectTailExec.scala | 80 +++++++++++++
.../GlutenClickHouseCollectTailExecSuite.scala | 129 +++++++++++++++++++++
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 -
.../gluten/backendsapi/BackendSettingsApi.scala | 2 -
.../extension/columnar/validator/Validators.scala | 3 -
7 files changed, 211 insertions(+), 8 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 8a4fde534f..5f64238436 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -124,6 +124,7 @@ object CHRuleApi {
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
c.session)))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
+ injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, CHBatch))
injector.injectPostTransform(c => RemoveDuplicatedColumns(c.session))
injector.injectPostTransform(c => AddPreProjectionForHashJoin(c.session))
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 69da8ba2a2..aa58b5065e 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -949,7 +949,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
CHColumnarCollectLimitExec(limit, offset, child)
override def genColumnarTailExec(limit: Int, child: SparkPlan):
ColumnarCollectTailBaseExec =
- throw new GlutenNotSupportException("ColumnarTail is not supported in ch
backend.")
+ CHColumnarCollectTailExec(limit, child)
override def genColumnarRangeExec(
start: Long,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectTailExec.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectTailExec.scala
new file mode 100644
index 0000000000..5b40c6ab54
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectTailExec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.CHNativeBlock
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.mutable
+import scala.util.control.Breaks._
+
+case class CHColumnarCollectTailExec(
+ limit: Int,
+ child: SparkPlan
+) extends ColumnarCollectTailBaseExec(limit, child) {
+
+ override protected def collectTailRows(
+ partitionIter: Iterator[ColumnarBatch],
+ limit: Int
+ ): Iterator[ColumnarBatch] = {
+ if (!partitionIter.hasNext || limit <= 0) {
+ return Iterator.empty
+ }
+
+ val tailQueue = new mutable.ListBuffer[ColumnarBatch]()
+ var totalRowsInTail = 0L
+
+ while (partitionIter.hasNext) {
+ val batch = partitionIter.next()
+ val tailBatch =
CHNativeBlock.fromColumnarBatch(batch).copyColumnarBatch()
+ val batchRows = tailBatch.numRows()
+ tailQueue += tailBatch
+ totalRowsInTail += batchRows
+
+ breakable {
+ while (tailQueue.nonEmpty) {
+ val front = tailQueue.head
+ val frontRows = front.numRows()
+
+ if (totalRowsInTail - frontRows >= limit) {
+ val dropped = tailQueue.remove(0)
+ dropped.close()
+ totalRowsInTail -= frontRows
+ } else {
+ break
+ }
+ }
+ }
+ }
+
+ val overflow = totalRowsInTail - limit
+ if (overflow > 0) {
+ val first = tailQueue.remove(0)
+ val keep = first.numRows() - overflow
+ val sliced = CHNativeBlock.slice(first, overflow.toInt, keep.toInt)
+ tailQueue.prepend(sliced)
+ first.close()
+ }
+
+ tailQueue.iterator
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectTailExecSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectTailExecSuite.scala
new file mode 100644
index 0000000000..a6ffdeac39
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectTailExecSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class GlutenClickHouseCollectTailExecSuite extends
GlutenClickHouseWholeStageTransformerSuite {
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ }
+
+ private def verifyTailExec(df: DataFrame, expectedRows: Seq[Row], tailCount:
Int): Unit = {
+
+ val latch = new CountDownLatch(1)
+
+ @volatile var listenerException: Option[Throwable] = None
+
+ class TailExecListener extends QueryExecutionListener {
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ try {
+ val latestPlan = qe.executedPlan.toString()
+ if (!latestPlan.contains("CHColumnarCollectTail")) {
+ throw new Exception("CHColumnarCollectTail not found in: " +
latestPlan)
+ }
+ } catch {
+ case ex: Throwable =>
+ listenerException = Some(ex)
+ } finally {
+ latch.countDown()
+ }
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, error:
Exception): Unit = {
+ listenerException = Some(error)
+ latch.countDown()
+ }
+ }
+
+ val tailExecListener = new TailExecListener()
+ spark.listenerManager.register(tailExecListener)
+
+ val tailArray = df.tail(tailCount)
+ latch.await(10, TimeUnit.SECONDS)
+ listenerException.foreach(throw _)
+
+ assert(
+ tailArray.sameElements(expectedRows),
+ s"""
+ |Tail output [${tailArray.mkString(", ")}]
+ |did not match expected [${expectedRows.mkString(", ")}].
+ """.stripMargin
+ )
+
+ spark.listenerManager.unregister(tailExecListener)
+ }
+
+ test("CHColumnarCollectTailExec - verify CollectTailExec in physical plan") {
+ val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+ val expected = Seq(Row(9996L), Row(9997L), Row(9998L), Row(9999L))
+ verifyTailExec(df, expected, tailCount = 4)
+ }
+
+ test("CHColumnarCollectTailExec - basic tail test") {
+ val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+ val expected = (3000L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 7000)
+ }
+
+ test("CHColumnarCollectTailExec - with filter") {
+ val df = spark.range(0, 10000, 1).toDF("id").filter("id % 2 ==
0").orderBy("id")
+ val evenCount = 5000
+ val expected = (9990L to 9998L by 2).map(Row(_)).takeRight(5)
+ verifyTailExec(df, expected, tailCount = 5)
+ }
+
+ test("CHColumnarCollectTailExec - range with repartition") {
+ val df = spark.range(0, 10000, 1).toDF("id").repartition(3).orderBy("id")
+ val expected = (9997L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 3)
+ }
+
+ test("CHColumnarCollectTailExec - with distinct values") {
+ val df = spark.range(0, 10000, 1).toDF("id").distinct().orderBy("id")
+ val expected = (9995L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 5)
+ }
+
+ test("CHColumnarCollectTailExec - chained tail") {
+ val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+ val expected = (9992L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 8)
+ }
+
+ test("CHColumnarCollectTailExec - tail after union") {
+ val df1 = spark.range(0, 5000).toDF("id")
+ val df2 = spark.range(5000, 10000).toDF("id")
+ val unionDf = df1.union(df2).orderBy("id")
+ val expected = (9997L to 9999L).map(Row(_))
+ verifyTailExec(unionDf, expected, tailCount = 3)
+ }
+
+ test("CHColumnarCollectTailExec - tail spans across two columnar batches") {
+ val df = spark.range(0, 4101).toDF("id").orderBy("id")
+ val expected = (4095L to 4100L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 6)
+ }
+}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 07009dc1cb..56e5a07239 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -558,8 +558,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def needPreComputeRangeFrameBoundary(): Boolean = true
- override def supportCollectTailExec(): Boolean = true
-
override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
override def supportIcebergEqualityDeleteRead(): Boolean = false
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 3f45e53e7b..de390b256c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,8 +152,6 @@ trait BackendSettingsApi {
def needPreComputeRangeFrameBoundary(): Boolean = false
- def supportCollectTailExec(): Boolean = false
-
def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
def supportIcebergEqualityDeleteRead(): Boolean = true
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 73f2da891e..149cd63462 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,9 +136,6 @@ object Validators {
fail(p)
case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
- // Add a tag for failing validation since CH is not supported. This tag
is not used explicitly
- // by post-transform rules, rather marks validation for the appropriate
backend.
- case p: CollectTailExec if !settings.supportCollectTailExec() => fail(p)
case _ => pass()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]