This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c9e2e920da [GLUTEN-9073][VL] Add support for CollectTail Operator 
(#9074)
c9e2e920da is described below

commit c9e2e920dad580ade737eff87e9995b8db80d127
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu May 1 21:20:57 2025 +0530

    [GLUTEN-9073][VL] Add support for CollectTail Operator (#9074)
---
 .../clickhouse/CHSparkPlanExecApi.scala            |   3 +
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   2 +
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   2 +
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |   3 +
 .../gluten/execution/ColumnarCollectTailExec.scala |  81 ++++++++++++
 .../execution/GlutenSQLCollectTailExecSuite.scala  | 138 +++++++++++++++++++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |   2 +
 .../gluten/backendsapi/SparkPlanExecApi.scala      |   2 +
 .../execution/ColumnarCollectTailBaseExec.scala    | 112 +++++++++++++++++
 .../columnar/CollectTailTransformerRule.scala      |  39 ++++++
 .../extension/columnar/validator/Validators.scala  |   3 +
 11 files changed, 387 insertions(+)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index ea992d4beb..69da8ba2a2 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -948,6 +948,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       offset: Int): ColumnarCollectLimitBaseExec =
     CHColumnarCollectLimitExec(limit, offset, child)
 
+  override def genColumnarTailExec(limit: Int, child: SparkPlan): 
ColumnarCollectTailBaseExec =
+    throw new GlutenNotSupportException("ColumnarTail is not supported in ch 
backend.")
+
   override def genColumnarRangeExec(
       start: Long,
       end: Long,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 56e5a07239..07009dc1cb 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -558,6 +558,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def needPreComputeRangeFrameBoundary(): Boolean = true
 
+  override def supportCollectTailExec(): Boolean = true
+
   override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
 
   override def supportIcebergEqualityDeleteRead(): Boolean = false
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index fe785c6fea..443ad147de 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -102,6 +102,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
+    injector.injectPostTransform(_ => CollectTailTransformerRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
 
     // Gluten columnar: Fallback policies.
@@ -191,6 +192,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
+    injector.injectPostTransform(_ => CollectTailTransformerRule())
     injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
     injector.injectPostTransform(c => RemoveTopmostColumnarToRow(c.session, 
c.caller.isAqe()))
     SparkShimLoader.getSparkShims
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index f6c66585da..199a3d235b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -921,4 +921,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       child: Seq[SparkPlan]): ColumnarRangeBaseExec =
     ColumnarRangeExec(start, end, step, numSlices, numElements, 
outputAttributes, child)
 
+  override def genColumnarTailExec(limit: Int, child: SparkPlan): 
ColumnarCollectTailBaseExec =
+    ColumnarCollectTailExec(limit, child)
+
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailExec.scala
new file mode 100644
index 0000000000..7977c9d610
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailExec.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.mutable
+import scala.util.control.Breaks._
+
+case class ColumnarCollectTailExec(
+    limit: Int,
+    child: SparkPlan
+) extends ColumnarCollectTailBaseExec(limit, child) {
+
+  override protected def collectTailRows(
+      partitionIter: Iterator[ColumnarBatch],
+      limit: Int
+  ): Iterator[ColumnarBatch] = {
+    if (!partitionIter.hasNext || limit <= 0) {
+      return Iterator.empty
+    }
+
+    val tailQueue = new mutable.ListBuffer[ColumnarBatch]()
+    var totalRowsInTail = 0L
+
+    while (partitionIter.hasNext) {
+      val batch = partitionIter.next()
+      val batchRows = batch.numRows()
+      ColumnarBatches.retain(batch)
+      tailQueue += batch
+      totalRowsInTail += batchRows
+
+      breakable {
+        while (tailQueue.nonEmpty) {
+          val front = tailQueue.head
+          val frontRows = front.numRows()
+
+          if (totalRowsInTail - frontRows >= limit) {
+            val dropped = tailQueue.remove(0)
+            dropped.close()
+            totalRowsInTail -= frontRows
+          } else {
+            break
+          }
+        }
+      }
+    }
+
+    val overflow = totalRowsInTail - limit
+    if (overflow > 0) {
+      val first = tailQueue.remove(0)
+      val keep = first.numRows() - overflow
+      val sliced = VeloxColumnarBatches.slice(first, overflow.toInt, 
keep.toInt)
+      tailQueue.prepend(sliced)
+      first.close()
+    }
+
+    tailQueue.iterator
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectTailExecSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectTailExecSuite.scala
new file mode 100644
index 0000000000..d92fc4eac3
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectTailExecSuite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class GlutenSQLCollectTailExecSuite extends WholeStageTransformerSuite {
+
+  override protected val resourcePath: String = "N/A"
+  override protected val fileFormat: String = "N/A"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+  }
+
+  /**
+   * Helper method for: 1) Registers a Listener that checks for 
"ColumnarCollectTail" in the final
+   * plan. 2) Calls df.tail(tailCount) to physically trigger CollectTailExec. 
3) Asserts the
+   * returned rows match expectedRows.
+   */
+  private def verifyTailExec(df: DataFrame, expectedRows: Seq[Row], tailCount: 
Int): Unit = {
+
+    val latch = new CountDownLatch(1)
+
+    @volatile var listenerException: Option[Throwable] = None
+
+    class TailExecListener extends QueryExecutionListener {
+
+      override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+        try {
+          val latestPlan = qe.executedPlan.toString()
+          if (!latestPlan.contains("ColumnarCollectTail")) {
+            throw new Exception("ColumnarCollectTail not found in: " + 
latestPlan)
+          }
+        } catch {
+          case ex: Throwable =>
+            listenerException = Some(ex)
+        } finally {
+          latch.countDown()
+        }
+      }
+
+      override def onFailure(funcName: String, qe: QueryExecution, error: 
Exception): Unit = {
+        listenerException = Some(error)
+        latch.countDown()
+      }
+    }
+
+    val tailExecListener = new TailExecListener()
+    spark.listenerManager.register(tailExecListener)
+
+    val tailArray = df.tail(tailCount)
+    latch.await(10, TimeUnit.SECONDS)
+    listenerException.foreach(throw _)
+
+    assert(
+      tailArray.sameElements(expectedRows),
+      s"""
+         |Tail output [${tailArray.mkString(", ")}]
+         |did not match expected [${expectedRows.mkString(", ")}].
+         """.stripMargin
+    )
+
+    spark.listenerManager.unregister(tailExecListener)
+  }
+
+  test("ColumnarCollectTailExec - verify CollectTailExec in physical plan") {
+    val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+    val expected = Seq(Row(9996L), Row(9997L), Row(9998L), Row(9999L))
+    verifyTailExec(df, expected, tailCount = 4)
+  }
+
+  test("ColumnarCollectTailExec - basic tail test") {
+    val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+    val expected = (3000L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 7000)
+  }
+
+  test("ColumnarCollectTailExec - with filter") {
+    val df = spark.range(0, 10000, 1).toDF("id").filter("id % 2 == 
0").orderBy("id")
+    val evenCount = 5000
+    val expected = (9990L to 9998L by 2).map(Row(_)).takeRight(5)
+    verifyTailExec(df, expected, tailCount = 5)
+  }
+
+  test("ColumnarCollectTailExec - range with repartition") {
+    val df = spark.range(0, 10000, 1).toDF("id").repartition(3).orderBy("id")
+    val expected = (9997L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 3)
+  }
+
+  test("ColumnarCollectTailExec - with distinct values") {
+    val df = spark.range(0, 10000, 1).toDF("id").distinct().orderBy("id")
+    val expected = (9995L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 5)
+  }
+
+  test("ColumnarCollectTailExec - chained tail") {
+    val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+    val expected = (9992L to 9999L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 8)
+  }
+
+  test("ColumnarCollectTailExec - tail after union") {
+    val df1 = spark.range(0, 5000).toDF("id")
+    val df2 = spark.range(5000, 10000).toDF("id")
+    val unionDf = df1.union(df2).orderBy("id")
+    val expected = (9997L to 9999L).map(Row(_))
+    verifyTailExec(unionDf, expected, tailCount = 3)
+  }
+
+  test("ColumnarCollectTailExec - tail spans across two columnar batches") {
+    val df = spark.range(0, 4101).toDF("id").orderBy("id")
+    val expected = (4095L to 4100L).map(Row(_))
+    verifyTailExec(df, expected, tailCount = 6)
+  }
+
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index de390b256c..3f45e53e7b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,6 +152,8 @@ trait BackendSettingsApi {
 
   def needPreComputeRangeFrameBoundary(): Boolean = false
 
+  def supportCollectTailExec(): Boolean = false
+
   def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
 
   def supportIcebergEqualityDeleteRead(): Boolean = true
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index a899099032..15bf11304d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -724,6 +724,8 @@ trait SparkPlanExecApi {
       outputAttributes: Seq[Attribute],
       child: Seq[SparkPlan]): ColumnarRangeBaseExec
 
+  def genColumnarTailExec(limit: Int, plan: SparkPlan): 
ColumnarCollectTailBaseExec
+
   def expressionFlattenSupported(expr: Expression): Boolean = false
 
   def genFlattenedExpressionTransformer(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala
new file mode 100644
index 0000000000..169e6df6ef
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
+import org.apache.spark.sql.execution.{LimitExec, ShuffledColumnarBatchRDD, 
SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class ColumnarCollectTailBaseExec(
+    limit: Int,
+    childPlan: SparkPlan
+) extends LimitExec
+  with ValidatablePlan {
+
+  override def batchType(): Convention.BatchType =
+    BackendsApiManager.getSettings.primaryBatchType
+
+  private lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+    
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+    BackendsApiManager.getMetricsApiInstance
+      .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+      readMetrics ++ writeMetrics
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = SinglePartition
+
+  override protected def doValidateInternal(): ValidationResult =
+    ValidationResult.failed("Columnar shuffle not enabled or child does not 
support columnar.")
+
+  override protected def doExecute(): RDD[InternalRow] =
+    throw new UnsupportedOperationException(s"This operator doesn't support 
doExecute().")
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val childExecution = child.executeColumnar()
+
+    if (childExecution.getNumPartitions == 0) {
+      return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
+    }
+
+    val processedRDD =
+      if (childExecution.getNumPartitions == 1) childExecution
+      else shuffleLimitedPartitions(childExecution)
+
+    processedRDD.mapPartitions(partition => collectTailRows(partition, limit))
+  }
+
+  private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]): 
RDD[ColumnarBatch] = {
+    val locallyLimited = if (limit >= 0) {
+      childRDD.mapPartitions(partition => collectTailRows(partition, limit))
+    } else {
+      childRDD
+    }
+    new ShuffledColumnarBatchRDD(
+      BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
+        locallyLimited,
+        child.output,
+        child.output,
+        SinglePartition,
+        serializer,
+        writeMetrics,
+        metrics,
+        useSortBasedShuffle
+      ),
+      readMetrics
+    )
+  }
+
+  protected def collectTailRows(
+      partitionIter: Iterator[ColumnarBatch],
+      limit: Int
+  ): Iterator[ColumnarBatch]
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectTailTransformerRule.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectTailTransformerRule.scala
new file mode 100644
index 0000000000..7b2cf7d6f5
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectTailTransformerRule.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{CollectTailExec, SparkPlan}
+
+case class CollectTailTransformerRule() extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!GlutenConfig.get.enableColumnarCollectLimit) {
+      return plan
+    }
+
+    val transformed = plan.transformUp {
+      case exec: CollectTailExec if exec.child.supportsColumnar =>
+        BackendsApiManager.getSparkPlanExecApiInstance
+          .genColumnarTailExec(exec.limit, exec.child)
+    }
+
+    transformed
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 36126cf4c7..cc9258d85c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,6 +136,9 @@ object Validators {
         fail(p)
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
+      // Add a tag for failing validation since CH is not supported. This tag 
is not used explicitly
+      // by post-transform rules, rather marks validation for the appropriate 
backend.
+      case p: CollectTailExec if !settings.supportCollectTailExec() => fail(p)
       case _ => pass()
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to