This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 07f84be2ba [GLUTEN-7143][VL] RAS: Fix failed UTs in 
GlutenSQLQueryTestSuite (#7754)
07f84be2ba is described below

commit 07f84be2ba2b95717ea74356859a9d08bb9dab10
Author: Hongze Zhang <hongze.zh...@intel.com>
AuthorDate: Fri Nov 1 10:15:55 2024 +0800

    [GLUTEN-7143][VL] RAS: Fix failed UTs in GlutenSQLQueryTestSuite (#7754)
---
 .../backendsapi/clickhouse/CHValidatorApi.scala      |  3 ++-
 .../gluten/backendsapi/velox/VeloxValidatorApi.scala |  9 +++++++--
 .../org/apache/gluten/backendsapi/ValidatorApi.scala |  3 ++-
 .../extension/columnar/enumerated/RasOffload.scala   | 20 ++++++++++++++------
 .../sql/execution/ColumnarShuffleExchangeExec.scala  |  2 +-
 5 files changed, 26 insertions(+), 11 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
index 5fe7406946..eed493cffe 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
@@ -28,7 +28,7 @@ import 
org.apache.gluten.vectorized.CHNativeExpressionEvaluator
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -71,6 +71,7 @@ class CHValidatorApi extends ValidatorApi with 
AdaptiveSparkPlanHelper with Logg
   }
 
   override def doColumnarShuffleExchangeExecValidate(
+      outputAttributes: Seq[Attribute],
       outputPartitioning: Partitioning,
       child: SparkPlan): Option[String] = {
     val outputAttributes = child.output
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
index 00a8f8cb0e..ddf77e5fa3 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
@@ -22,7 +22,7 @@ import org.apache.gluten.substrait.plan.PlanNode
 import org.apache.gluten.validate.NativePlanValidationInfo
 import org.apache.gluten.vectorized.NativePlanEvaluator
 
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types._
@@ -87,11 +87,16 @@ class VeloxValidatorApi extends ValidatorApi {
   }
 
   override def doColumnarShuffleExchangeExecValidate(
+      outputAttributes: Seq[Attribute],
       outputPartitioning: Partitioning,
       child: SparkPlan): Option[String] = {
+    if (outputAttributes.isEmpty) {
+      // See: https://github.com/apache/incubator-gluten/issues/7600.
+      return Some("Shuffle with empty output schema is not supported")
+    }
     if (child.output.isEmpty) {
       // See: https://github.com/apache/incubator-gluten/issues/7600.
-      return Some("Shuffle with empty schema is not supported")
+      return Some("Shuffle with empty input schema is not supported")
     }
     doSchemaValidate(child.schema)
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
index 90f132d78b..4a18a618bf 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi
 import org.apache.gluten.extension.ValidationResult
 import org.apache.gluten.substrait.plan.PlanNode
 
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.DataType
@@ -58,6 +58,7 @@ trait ValidatorApi {
 
   /** Validate against ColumnarShuffleExchangeExec. */
   def doColumnarShuffleExchangeExecValidate(
+      outputAttributes: Seq[Attribute],
       outputPartitioning: Partitioning,
       child: SparkPlan): Option[String]
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
index 1299fffb99..15522e4986 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
@@ -77,7 +77,15 @@ object RasOffload {
         // 0. If the node is already offloaded, fail fast.
         assert(typeIdentifier.isInstance(node))
 
-        // 1. Rewrite the node to form that native library supports.
+        // 1. Pre-validate the input node. Fast fail if no good.
+        validator.validate(node) match {
+          case Validator.Passed =>
+          case Validator.Failed(reason) =>
+            FallbackTags.add(node, reason)
+            return List.empty
+        }
+
+        // 2. Rewrite the node to form that native library supports.
         val rewritten = rewrites.foldLeft(node) {
           case (node, rewrite) =>
             node.transformUp {
@@ -87,17 +95,17 @@ object RasOffload {
             }
         }
 
-        // 2. Walk the rewritten tree.
+        // 3. Walk the rewritten tree.
         val offloaded = rewritten.transformUp {
           case from if typeIdentifier.isInstance(from) =>
-            // 3. Validate current node. If passed, offload it.
+            // 4. Validate current node. If passed, offload it.
             validator.validate(from) match {
               case Validator.Passed =>
                 val offloadedPlan = base.offload(from)
                 val offloadedNodes = offloadedPlan.collect[GlutenPlan] { case 
t: GlutenPlan => t }
                 val outComes = 
offloadedNodes.map(_.doValidate()).filter(!_.ok())
                 if (outComes.nonEmpty) {
-                  // 4. If native validation fails on the offloaded node, 
return the
+                  // 5. If native validation fails on the offloaded node, 
return the
                   // original one.
                   outComes.foreach(FallbackTags.add(from, _))
                   from
@@ -110,12 +118,12 @@ object RasOffload {
             }
         }
 
-        // 5. If rewritten plan is not offload-able, discard it.
+        // 6. If rewritten plan is not offload-able, discard it.
         if (offloaded.fastEquals(rewritten)) {
           return List.empty
         }
 
-        // 6. Otherwise, return the final tree.
+        // 7. Otherwise, return the final tree.
         List(offloaded)
       }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index f12e3ae0b3..4f62377b09 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -115,7 +115,7 @@ case class ColumnarShuffleExchangeExec(
 
   override protected def doValidateInternal(): ValidationResult = {
     BackendsApiManager.getValidatorApiInstance
-      .doColumnarShuffleExchangeExecValidate(outputPartitioning, child)
+      .doColumnarShuffleExchangeExecValidate(output, outputPartitioning, child)
       .map {
         reason =>
           ValidationResult.failed(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to