This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b64cd7bcf1 [GLUTEN-11678][VL] Native validation should check
CrossRelNode's expression (#11679)
b64cd7bcf1 is described below
commit b64cd7bcf198bdc948b0fdb696a344dc5069dc76
Author: Wechar Yu <[email protected]>
AuthorDate: Fri Apr 3 22:35:56 2026 +0800
[GLUTEN-11678][VL] Native validation should check CrossRelNode's expression
(#11679)
Check the CrossRelNode's expression, fallback it if experssion is not
supported.
Fix #11678.
---
.../apache/gluten/execution/FallbackSuite.scala | 37 +++++++++++++++++++++-
.../substrait/SubstraitToVeloxPlanValidator.cc | 3 ++
2 files changed, 39 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 29b0470951..b06db41fe2 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.scheduler.{SparkListener,
SparkListenerEvent}
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarShuffleExchangeExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEShuffleReadExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
SortMergeJoinExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
BroadcastNestedLoopJoinExec, SortMergeJoinExec}
import org.apache.spark.utils.GlutenSuiteUtils
import scala.collection.mutable.ArrayBuffer
@@ -352,4 +352,39 @@ class FallbackSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}
}
+
+ test("fallback when nested loop join has unsupported expression") {
+ val events = new ArrayBuffer[GlutenPlanFallbackEvent]
+ val listener = new SparkListener {
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case e: GlutenPlanFallbackEvent => events.append(e)
+ case _ =>
+ }
+ }
+ }
+ spark.sparkContext.addSparkListener(listener)
+
+ try {
+ val df = spark.sql("""
+ |select tmp1.c1, tmp1.c2 from tmp1
+ |left join tmp2 on (
+ | tmp1.c1 = regexp_extract(tmp2.c1,
'(?<=@)[^.]+(?=\.)', 0)
+ | or tmp2.c1 > 10
+ |)
+ |""".stripMargin)
+ df.collect()
+ GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
+
+ val nestedLoopJoin = find(df.queryExecution.executedPlan) {
+ _.isInstanceOf[BroadcastNestedLoopJoinExec]
+ }
+ assert(nestedLoopJoin.isDefined)
+ val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
+ assert(fallbackReasons.nonEmpty)
+ assert(fallbackReasons.forall(_.contains("regexp_extract due to
Pattern")))
+ } finally {
+ spark.sparkContext.removeSparkListener(listener)
+ }
+ }
}
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
index 39c67c0f8c..8a24111c0d 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlanValidator.cc
@@ -1124,6 +1124,9 @@ bool SubstraitToVeloxPlanValidator::validate(const
::substrait::CrossRel& crossR
auto rowType = std::make_shared<RowType>(std::move(names), std::move(types));
if (crossRel.has_expression()) {
+ if (!validateExpression(crossRel.expression(), rowType)) {
+ return false;
+ }
auto expression = exprConverter_->toVeloxExpr(crossRel.expression(),
rowType);
exec::ExprSet exprSet({std::move(expression)}, execCtx_.get());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]