This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 69ffe2ccf3 [GLUTEN-9492][CH] Add Support for CollectTail operator 
(#9476)
69ffe2ccf3 is described below

commit 69ffe2ccf3460e547d37d98360895327f6d1d447
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu May 8 12:14:53 2025 +0530

    [GLUTEN-9492][CH] Add Support for CollectTail operator (#9476)
---
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   1 +
 .../clickhouse/CHSparkPlanExecApi.scala            |   2 +-
 .../execution/CHColumnarCollectTailExec.scala      |  80 +++++++++++++
 .../GlutenClickHouseCollectTailExecSuite.scala     | 129 +++++++++++++++++++++
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   2 -
 .../gluten/backendsapi/BackendSettingsApi.scala    |   2 -
 .../extension/columnar/validator/Validators.scala  |   3 -
 7 files changed, 211 insertions(+), 8 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 8a4fde534f..5f64238436 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -124,6 +124,7 @@ object CHRuleApi {
           
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
             c.session)))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
+    injector.injectPostTransform(_ => CollectTailTransformerRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, CHBatch))
     injector.injectPostTransform(c => RemoveDuplicatedColumns(c.session))
     injector.injectPostTransform(c => AddPreProjectionForHashJoin(c.session))
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 69da8ba2a2..aa58b5065e 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -949,7 +949,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
     CHColumnarCollectLimitExec(limit, offset, child)
 
   override def genColumnarTailExec(limit: Int, child: SparkPlan): 
ColumnarCollectTailBaseExec =
-    throw new GlutenNotSupportException("ColumnarTail is not supported in ch 
backend.")
+    CHColumnarCollectTailExec(limit, child)
 
   override def genColumnarRangeExec(
       start: Long,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectTailExec.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectTailExec.scala
new file mode 100644
index 0000000000..5b40c6ab54
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHColumnarCollectTailExec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.vectorized.CHNativeBlock
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.mutable
+import scala.util.control.Breaks._
+
+case class CHColumnarCollectTailExec(
+    limit: Int,
+    child: SparkPlan
+) extends ColumnarCollectTailBaseExec(limit, child) {
+
+  override protected def collectTailRows(
+      partitionIter: Iterator[ColumnarBatch],
+      limit: Int
+  ): Iterator[ColumnarBatch] = {
+    if (!partitionIter.hasNext || limit <= 0) {
+      return Iterator.empty
+    }
+
+    val tailQueue = new mutable.ListBuffer[ColumnarBatch]()
+    var totalRowsInTail = 0L
+
+    while (partitionIter.hasNext) {
+      val batch = partitionIter.next()
+      val tailBatch = 
CHNativeBlock.fromColumnarBatch(batch).copyColumnarBatch()
+      val batchRows = tailBatch.numRows()
+      tailQueue += tailBatch
+      totalRowsInTail += batchRows
+
+      breakable {
+        while (tailQueue.nonEmpty) {
+          val front = tailQueue.head
+          val frontRows = front.numRows()
+
+          if (totalRowsInTail - frontRows >= limit) {
+            val dropped = tailQueue.remove(0)
+            dropped.close()
+            totalRowsInTail -= frontRows
+          } else {
+            break
+          }
+        }
+      }
+    }
+
+    val overflow = totalRowsInTail - limit
+    if (overflow > 0) {
+      val first = tailQueue.remove(0)
+      val keep = first.numRows() - overflow
+      val sliced = CHNativeBlock.slice(first, overflow.toInt, keep.toInt)
+      tailQueue.prepend(sliced)
+      first.close()
+    }
+
+    tailQueue.iterator
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectTailExecSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectTailExecSuite.scala
new file mode 100644
index 0000000000..a6ffdeac39
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseCollectTailExecSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class GlutenClickHouseCollectTailExecSuite extends 
GlutenClickHouseWholeStageTransformerSuite {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+  }
+
+  private def verifyTailExec(df: DataFrame, expectedRows: Seq[Row], tailCount: 
Int): Unit = {
+
+    val latch = new CountDownLatch(1)
+
+    @volatile var listenerException: Option[Throwable] = None
+
+    class TailExecListener extends QueryExecutionListener {
+
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        try {
+          val latestPlan = qe.executedPlan.toString()
+          if (!latestPlan.contains("CHColumnarCollectTail")) {
+            throw new Exception("CHColumnarCollectTail not found in: " + 
latestPlan)
+          }
+        } catch {
+          case ex: Throwable =>
+            listenerException = Some(ex)
+        } finally {
+          latch.countDown()
+        }
+      }
+
+      override def onFailure(funcName: String, qe: QueryExecution, error: 
Exception): Unit = {
+        listenerException = Some(error)
+        latch.countDown()
+      }
+    }
+
+    val tailExecListener = new TailExecListener()
+    spark.listenerManager.register(tailExecListener)
+
+    val tailArray = df.tail(tailCount)
+    latch.await(10, TimeUnit.SECONDS)
+    listenerException.foreach(throw _)
+
+    assert(
+      tailArray.sameElements(expectedRows),
+      s"""
+         |Tail output [${tailArray.mkString(", ")}]
+         |did not match expected [${expectedRows.mkString(", ")}].
+         """.stripMargin
+    )
+
+    spark.listenerManager.unregister(tailExecListener)
+  }
+
+  test("CHColumnarCollectTailExec - verify CollectTailExec in physical plan") {
+    val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+    val expected = Seq(Row(9996L), Row(9997L), Row(9998L), Row(9999L))
+    verifyTailExec(df, expected, tailCount = 4)
+  }
+
+  test("CHColumnarCollectTailExec - basic tail test") {
+    val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+    val expected = (3000L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 7000)
+  }
+
+  test("CHColumnarCollectTailExec - with filter") {
+    val df = spark.range(0, 10000, 1).toDF("id").filter("id % 2 == 
0").orderBy("id")
+    val evenCount = 5000
+    val expected = (9990L to 9998L by 2).map(Row(_)).takeRight(5)
+    verifyTailExec(df, expected, tailCount = 5)
+  }
+
+  test("CHColumnarCollectTailExec - range with repartition") {
+    val df = spark.range(0, 10000, 1).toDF("id").repartition(3).orderBy("id")
+    val expected = (9997L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 3)
+  }
+
+  test("CHColumnarCollectTailExec - with distinct values") {
+    val df = spark.range(0, 10000, 1).toDF("id").distinct().orderBy("id")
+    val expected = (9995L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 5)
+  }
+
+  test("CHColumnarCollectTailExec - chained tail") {
+    val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+    val expected = (9992L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 8)
+  }
+
+  test("CHColumnarCollectTailExec - tail after union") {
+    val df1 = spark.range(0, 5000).toDF("id")
+    val df2 = spark.range(5000, 10000).toDF("id")
+    val unionDf = df1.union(df2).orderBy("id")
+    val expected = (9997L to 9999L).map(Row(_))
+    verifyTailExec(unionDf, expected, tailCount = 3)
+  }
+
+  test("CHColumnarCollectTailExec - tail spans across two columnar batches") {
+    val df = spark.range(0, 4101).toDF("id").orderBy("id")
+    val expected = (4095L to 4100L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 6)
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 07009dc1cb..56e5a07239 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -558,8 +558,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def needPreComputeRangeFrameBoundary(): Boolean = true
 
-  override def supportCollectTailExec(): Boolean = true
-
   override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
 
   override def supportIcebergEqualityDeleteRead(): Boolean = false
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 3f45e53e7b..de390b256c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,8 +152,6 @@ trait BackendSettingsApi {
 
   def needPreComputeRangeFrameBoundary(): Boolean = false
 
-  def supportCollectTailExec(): Boolean = false
-
   def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
 
   def supportIcebergEqualityDeleteRead(): Boolean = true
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 73f2da891e..149cd63462 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,9 +136,6 @@ object Validators {
         fail(p)
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
-      // Add a tag for failing validation since CH is not supported. This tag 
is not used explicitly
-      // by post-transform rules, rather marks validation for the appropriate 
backend.
-      case p: CollectTailExec if !settings.supportCollectTailExec() => fail(p)
       case _ => pass()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to