This is an automated email from the ASF dual-hosted git repository.

zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 1cc4ef43e [VL] Add schema validation for all operators (#6406)
1cc4ef43e is described below

commit 1cc4ef43e325fe62436fc86a332454d629e55b18
Author: Zhen Li <[email protected]>
AuthorDate: Thu Jul 11 22:07:06 2024 +0800

    [VL] Add schema validation for all operators (#6406)
    
    [VL] Add schema validation for all operators.
---
 .../apache/gluten/execution/RowToVeloxColumnarExec.scala  |  9 ---------
 .../scala/org/apache/gluten/execution/TestOperator.scala  | 10 ++++++++++
 .../execution/BasicPhysicalOperatorTransformer.scala      |  9 ---------
 .../scala/org/apache/gluten/extension/GlutenPlan.scala    | 10 ++++++++++
 .../sql/execution/ColumnarBroadcastExchangeExec.scala     | 15 +--------------
 5 files changed, 21 insertions(+), 32 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index 29478fe9d..2f3e88f9a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -17,9 +17,7 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.exec.Runtimes
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.utils.ArrowAbiUtil
@@ -47,13 +45,6 @@ import scala.collection.mutable.ListBuffer
 case class RowToVeloxColumnarExec(child: SparkPlan) extends 
RowToColumnarExecBase(child = child) {
 
   override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
-    
BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema).foreach {
-      reason =>
-        throw new GlutenException(
-          s"Input schema contains unsupported type when convert row to 
columnar for $schema " +
-            s"due to $reason")
-    }
-
     val numInputRows = longMetric("numInputRows")
     val numOutputBatches = longMetric("numOutputBatches")
     val convertTime = longMetric("convertTime")
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 230fc565d..07f910137 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -258,6 +258,16 @@ class TestOperator extends VeloxWholeStageTransformerSuite 
with AdaptiveSparkPla
     checkLengthAndPlan(df, 5)
   }
 
+  testWithSpecifiedSparkVersion("coalesce validation", Some("3.4")) {
+    withTempPath {
+      path =>
+        val data = "2019-09-09 01:02:03.456789"
+        val df = Seq(data).toDF("strTs").selectExpr(s"CAST(strTs AS 
TIMESTAMP_NTZ) AS ts")
+        df.coalesce(1).write.format("parquet").save(path.getCanonicalPath)
+        spark.read.parquet(path.getCanonicalPath).collect
+    }
+  }
+
   test("groupby") {
     val df = runQueryAndCompare(
       "select l_orderkey, sum(l_partkey) as sum from lineitem " +
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 0b792d52e..97b4c3a3f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -311,15 +311,6 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) 
extends SparkPlan with Gl
   }
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
columnarInputRDD
-
-  override protected def doValidateInternal(): ValidationResult = {
-    BackendsApiManager.getValidatorApiInstance
-      .doSchemaValidate(schema)
-      .map {
-        reason => ValidationResult.notOk(s"Found schema check failure for 
$schema, due to: $reason")
-      }
-      .getOrElse(ValidationResult.ok)
-  }
 }
 
 /**
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 8f1004be4..71a76ff63 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -63,6 +63,16 @@ trait GlutenPlan extends SparkPlan with 
Convention.KnownBatchType with LogLevelU
    * Validate whether this SparkPlan supports to be transformed into substrait 
node in Native Code.
    */
   final def doValidate(): ValidationResult = {
+    val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
+      .doSchemaValidate(schema)
+      .map {
+        reason => ValidationResult.notOk(s"Found schema check failure for 
$schema, due to: $reason")
+      }
+      .getOrElse(ValidationResult.ok)
+    if (!schemaVaidationResult.isValid) {
+      TestStats.addFallBackClassName(this.getClass.toString)
+      return schemaVaidationResult
+    }
     try {
       TransformerState.enterValidation
       val res = doValidateInternal()
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 4da7a2f6f..d55733fe4 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.metrics.GlutenTimeMetric
 import org.apache.gluten.sql.shims.SparkShimLoader
 
@@ -133,19 +133,6 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
     ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
   }
 
-  override protected def doValidateInternal(): ValidationResult = {
-    BackendsApiManager.getValidatorApiInstance
-      .doSchemaValidate(schema)
-      .map {
-        reason =>
-          {
-            ValidationResult.notOk(
-              s"Unsupported schema in broadcast exchange: $schema, reason: 
$reason")
-          }
-      }
-      .getOrElse(ValidationResult.ok)
-  }
-
   override def doPrepare(): Unit = {
     // Materialize the future.
     relationFuture


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to