This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c9e2e920da [GLUTEN-9073][VL] Add support for CollectTail Operator
(#9074)
c9e2e920da is described below
commit c9e2e920dad580ade737eff87e9995b8db80d127
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu May 1 21:20:57 2025 +0530
[GLUTEN-9073][VL] Add support for CollectTail Operator (#9074)
---
.../clickhouse/CHSparkPlanExecApi.scala | 3 +
.../gluten/backendsapi/velox/VeloxBackend.scala | 2 +
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 3 +
.../gluten/execution/ColumnarCollectTailExec.scala | 81 ++++++++++++
.../execution/GlutenSQLCollectTailExecSuite.scala | 138 +++++++++++++++++++++
.../gluten/backendsapi/BackendSettingsApi.scala | 2 +
.../gluten/backendsapi/SparkPlanExecApi.scala | 2 +
.../execution/ColumnarCollectTailBaseExec.scala | 112 +++++++++++++++++
.../columnar/CollectTailTransformerRule.scala | 39 ++++++
.../extension/columnar/validator/Validators.scala | 3 +
11 files changed, 387 insertions(+)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index ea992d4beb..69da8ba2a2 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -948,6 +948,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
offset: Int): ColumnarCollectLimitBaseExec =
CHColumnarCollectLimitExec(limit, offset, child)
+ override def genColumnarTailExec(limit: Int, child: SparkPlan):
ColumnarCollectTailBaseExec =
+ throw new GlutenNotSupportException("ColumnarTail is not supported in ch
backend.")
+
override def genColumnarRangeExec(
start: Long,
end: Long,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 56e5a07239..07009dc1cb 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -558,6 +558,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def needPreComputeRangeFrameBoundary(): Boolean = true
+ override def supportCollectTailExec(): Boolean = true
+
override def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = true
override def supportIcebergEqualityDeleteRead(): Boolean = false
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index fe785c6fea..443ad147de 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -102,6 +102,7 @@ object VeloxRuleApi {
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
+ injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
// Gluten columnar: Fallback policies.
@@ -191,6 +192,7 @@ object VeloxRuleApi {
injector.injectPostTransform(c =>
FlushableHashAggregateRule.apply(c.session))
injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
+ injector.injectPostTransform(_ => CollectTailTransformerRule())
injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
injector.injectPostTransform(c => RemoveTopmostColumnarToRow(c.session,
c.caller.isAqe()))
SparkShimLoader.getSparkShims
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index f6c66585da..199a3d235b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -921,4 +921,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
child: Seq[SparkPlan]): ColumnarRangeBaseExec =
ColumnarRangeExec(start, end, step, numSlices, numElements,
outputAttributes, child)
+ override def genColumnarTailExec(limit: Int, child: SparkPlan):
ColumnarCollectTailBaseExec =
+ ColumnarCollectTailExec(limit, child)
+
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailExec.scala
new file mode 100644
index 0000000000..7977c9d610
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailExec.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.mutable
+import scala.util.control.Breaks._
+
+case class ColumnarCollectTailExec(
+ limit: Int,
+ child: SparkPlan
+) extends ColumnarCollectTailBaseExec(limit, child) {
+
+ override protected def collectTailRows(
+ partitionIter: Iterator[ColumnarBatch],
+ limit: Int
+ ): Iterator[ColumnarBatch] = {
+ if (!partitionIter.hasNext || limit <= 0) {
+ return Iterator.empty
+ }
+
+ val tailQueue = new mutable.ListBuffer[ColumnarBatch]()
+ var totalRowsInTail = 0L
+
+ while (partitionIter.hasNext) {
+ val batch = partitionIter.next()
+ val batchRows = batch.numRows()
+ ColumnarBatches.retain(batch)
+ tailQueue += batch
+ totalRowsInTail += batchRows
+
+ breakable {
+ while (tailQueue.nonEmpty) {
+ val front = tailQueue.head
+ val frontRows = front.numRows()
+
+ if (totalRowsInTail - frontRows >= limit) {
+ val dropped = tailQueue.remove(0)
+ dropped.close()
+ totalRowsInTail -= frontRows
+ } else {
+ break
+ }
+ }
+ }
+ }
+
+ val overflow = totalRowsInTail - limit
+ if (overflow > 0) {
+ val first = tailQueue.remove(0)
+ val keep = first.numRows() - overflow
+ val sliced = VeloxColumnarBatches.slice(first, overflow.toInt,
keep.toInt)
+ tailQueue.prepend(sliced)
+ first.close()
+ }
+
+ tailQueue.iterator
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectTailExecSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectTailExecSuite.scala
new file mode 100644
index 0000000000..d92fc4eac3
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectTailExecSuite.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+class GlutenSQLCollectTailExecSuite extends WholeStageTransformerSuite {
+
+ override protected val resourcePath: String = "N/A"
+ override protected val fileFormat: String = "N/A"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ }
+
+ /**
+ * Helper method for: 1) Registers a Listener that checks for
"ColumnarCollectTail" in the final
+ * plan. 2) Calls df.tail(tailCount) to physically trigger CollectTailExec.
3) Asserts the
+ * returned rows match expectedRows.
+ */
+ private def verifyTailExec(df: DataFrame, expectedRows: Seq[Row], tailCount:
Int): Unit = {
+
+ val latch = new CountDownLatch(1)
+
+ @volatile var listenerException: Option[Throwable] = None
+
+ class TailExecListener extends QueryExecutionListener {
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ try {
+ val latestPlan = qe.executedPlan.toString()
+ if (!latestPlan.contains("ColumnarCollectTail")) {
+ throw new Exception("ColumnarCollectTail not found in: " +
latestPlan)
+ }
+ } catch {
+ case ex: Throwable =>
+ listenerException = Some(ex)
+ } finally {
+ latch.countDown()
+ }
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, error:
Exception): Unit = {
+ listenerException = Some(error)
+ latch.countDown()
+ }
+ }
+
+ val tailExecListener = new TailExecListener()
+ spark.listenerManager.register(tailExecListener)
+
+ val tailArray = df.tail(tailCount)
+ latch.await(10, TimeUnit.SECONDS)
+ listenerException.foreach(throw _)
+
+ assert(
+ tailArray.sameElements(expectedRows),
+ s"""
+ |Tail output [${tailArray.mkString(", ")}]
+ |did not match expected [${expectedRows.mkString(", ")}].
+ """.stripMargin
+ )
+
+ spark.listenerManager.unregister(tailExecListener)
+ }
+
+ test("ColumnarCollectTailExec - verify CollectTailExec in physical plan") {
+ val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+ val expected = Seq(Row(9996L), Row(9997L), Row(9998L), Row(9999L))
+ verifyTailExec(df, expected, tailCount = 4)
+ }
+
+ test("ColumnarCollectTailExec - basic tail test") {
+ val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+ val expected = (3000L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 7000)
+ }
+
+ test("ColumnarCollectTailExec - with filter") {
+ val df = spark.range(0, 10000, 1).toDF("id").filter("id % 2 ==
0").orderBy("id")
+ val evenCount = 5000
+ val expected = (9990L to 9998L by 2).map(Row(_)).takeRight(5)
+ verifyTailExec(df, expected, tailCount = 5)
+ }
+
+ test("ColumnarCollectTailExec - range with repartition") {
+ val df = spark.range(0, 10000, 1).toDF("id").repartition(3).orderBy("id")
+ val expected = (9997L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 3)
+ }
+
+ test("ColumnarCollectTailExec - with distinct values") {
+ val df = spark.range(0, 10000, 1).toDF("id").distinct().orderBy("id")
+ val expected = (9995L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 5)
+ }
+
+ test("ColumnarCollectTailExec - chained tail") {
+ val df = spark.range(0, 10000, 1).toDF("id").orderBy("id")
+ val expected = (9992L to 9999L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 8)
+ }
+
+ test("ColumnarCollectTailExec - tail after union") {
+ val df1 = spark.range(0, 5000).toDF("id")
+ val df2 = spark.range(5000, 10000).toDF("id")
+ val unionDf = df1.union(df2).orderBy("id")
+ val expected = (9997L to 9999L).map(Row(_))
+ verifyTailExec(unionDf, expected, tailCount = 3)
+ }
+
+ test("ColumnarCollectTailExec - tail spans across two columnar batches") {
+ val df = spark.range(0, 4101).toDF("id").orderBy("id")
+ val expected = (4095L to 4100L).map(Row(_))
+ verifyTailExec(df, expected, tailCount = 6)
+ }
+
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index de390b256c..3f45e53e7b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,6 +152,8 @@ trait BackendSettingsApi {
def needPreComputeRangeFrameBoundary(): Boolean = false
+ def supportCollectTailExec(): Boolean = false
+
def broadcastNestedLoopJoinSupportsFullOuterJoin(): Boolean = false
def supportIcebergEqualityDeleteRead(): Boolean = true
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index a899099032..15bf11304d 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -724,6 +724,8 @@ trait SparkPlanExecApi {
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): ColumnarRangeBaseExec
+ def genColumnarTailExec(limit: Int, plan: SparkPlan):
ColumnarCollectTailBaseExec
+
def expressionFlattenSupported(expr: Expression): Boolean = false
def genFlattenedExpressionTransformer(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala
new file mode 100644
index 0000000000..169e6df6ef
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectTailBaseExec.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
SinglePartition}
+import org.apache.spark.sql.execution.{LimitExec, ShuffledColumnarBatchRDD,
SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric,
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+abstract class ColumnarCollectTailBaseExec(
+ limit: Int,
+ childPlan: SparkPlan
+) extends LimitExec
+ with ValidatablePlan {
+
+ override def batchType(): Convention.BatchType =
+ BackendsApiManager.getSettings.primaryBatchType
+
+ private lazy val writeMetrics =
+ SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+ BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
+ override def output: Seq[Attribute] = child.output
+
+ override def outputPartitioning: Partitioning = SinglePartition
+
+ override protected def doValidateInternal(): ValidationResult =
+ ValidationResult.failed("Columnar shuffle not enabled or child does not
support columnar.")
+
+ override protected def doExecute(): RDD[InternalRow] =
+ throw new UnsupportedOperationException(s"This operator doesn't support
doExecute().")
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val childExecution = child.executeColumnar()
+
+ if (childExecution.getNumPartitions == 0) {
+ return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
+ }
+
+ val processedRDD =
+ if (childExecution.getNumPartitions == 1) childExecution
+ else shuffleLimitedPartitions(childExecution)
+
+ processedRDD.mapPartitions(partition => collectTailRows(partition, limit))
+ }
+
+ private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]):
RDD[ColumnarBatch] = {
+ val locallyLimited = if (limit >= 0) {
+ childRDD.mapPartitions(partition => collectTailRows(partition, limit))
+ } else {
+ childRDD
+ }
+ new ShuffledColumnarBatchRDD(
+ BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
+ locallyLimited,
+ child.output,
+ child.output,
+ SinglePartition,
+ serializer,
+ writeMetrics,
+ metrics,
+ useSortBasedShuffle
+ ),
+ readMetrics
+ )
+ }
+
+ protected def collectTailRows(
+ partitionIter: Iterator[ColumnarBatch],
+ limit: Int
+ ): Iterator[ColumnarBatch]
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectTailTransformerRule.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectTailTransformerRule.scala
new file mode 100644
index 0000000000..7b2cf7d6f5
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/CollectTailTransformerRule.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{CollectTailExec, SparkPlan}
+
+case class CollectTailTransformerRule() extends Rule[SparkPlan] {
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!GlutenConfig.get.enableColumnarCollectLimit) {
+ return plan
+ }
+
+ val transformed = plan.transformUp {
+ case exec: CollectTailExec if exec.child.supportsColumnar =>
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .genColumnarTailExec(exec.limit, exec.child)
+ }
+
+ transformed
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 36126cf4c7..cc9258d85c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,6 +136,9 @@ object Validators {
fail(p)
case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
+ // Add a tag for failing validation since CH is not supported. This tag
is not used explicitly
+ // by post-transform rules, rather marks validation for the appropriate
backend.
+ case p: CollectTailExec if !settings.supportCollectTailExec() => fail(p)
case _ => pass()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]