This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1cc4ef43e [VL] Add schema validation for all operators (#6406)
1cc4ef43e is described below
commit 1cc4ef43e325fe62436fc86a332454d629e55b18
Author: Zhen Li <[email protected]>
AuthorDate: Thu Jul 11 22:07:06 2024 +0800
[VL] Add schema validation for all operators (#6406)
[VL] Add schema validation for all operators.
---
.../apache/gluten/execution/RowToVeloxColumnarExec.scala | 9 ---------
.../scala/org/apache/gluten/execution/TestOperator.scala | 10 ++++++++++
.../execution/BasicPhysicalOperatorTransformer.scala | 9 ---------
.../scala/org/apache/gluten/extension/GlutenPlan.scala | 10 ++++++++++
.../sql/execution/ColumnarBroadcastExchangeExec.scala | 15 +--------------
5 files changed, 21 insertions(+), 32 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index 29478fe9d..2f3e88f9a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -17,9 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exec.Runtimes
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.ArrowAbiUtil
@@ -47,13 +45,6 @@ import scala.collection.mutable.ListBuffer
case class RowToVeloxColumnarExec(child: SparkPlan) extends
RowToColumnarExecBase(child = child) {
override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
-
BackendsApiManager.getValidatorApiInstance.doSchemaValidate(schema).foreach {
- reason =>
- throw new GlutenException(
- s"Input schema contains unsupported type when convert row to
columnar for $schema " +
- s"due to $reason")
- }
-
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
val convertTime = longMetric("convertTime")
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
index 230fc565d..07f910137 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/TestOperator.scala
@@ -258,6 +258,16 @@ class TestOperator extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPla
checkLengthAndPlan(df, 5)
}
+ testWithSpecifiedSparkVersion("coalesce validation", Some("3.4")) {
+ withTempPath {
+ path =>
+ val data = "2019-09-09 01:02:03.456789"
+ val df = Seq(data).toDF("strTs").selectExpr(s"CAST(strTs AS
TIMESTAMP_NTZ) AS ts")
+ df.coalesce(1).write.format("parquet").save(path.getCanonicalPath)
+ spark.read.parquet(path.getCanonicalPath).collect
+ }
+ }
+
test("groupby") {
val df = runQueryAndCompare(
"select l_orderkey, sum(l_partkey) as sum from lineitem " +
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 0b792d52e..97b4c3a3f 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -311,15 +311,6 @@ case class ColumnarUnionExec(children: Seq[SparkPlan])
extends SparkPlan with Gl
}
override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
columnarInputRDD
-
- override protected def doValidateInternal(): ValidationResult = {
- BackendsApiManager.getValidatorApiInstance
- .doSchemaValidate(schema)
- .map {
- reason => ValidationResult.notOk(s"Found schema check failure for
$schema, due to: $reason")
- }
- .getOrElse(ValidationResult.ok)
- }
}
/**
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 8f1004be4..71a76ff63 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -63,6 +63,16 @@ trait GlutenPlan extends SparkPlan with
Convention.KnownBatchType with LogLevelU
* Validate whether this SparkPlan supports to be transformed into substrait
node in Native Code.
*/
final def doValidate(): ValidationResult = {
+ val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
+ .doSchemaValidate(schema)
+ .map {
+ reason => ValidationResult.notOk(s"Found schema check failure for
$schema, due to: $reason")
+ }
+ .getOrElse(ValidationResult.ok)
+ if (!schemaVaidationResult.isValid) {
+ TestStats.addFallBackClassName(this.getClass.toString)
+ return schemaVaidationResult
+ }
try {
TransformerState.enterValidation
val res = doValidateInternal()
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 4da7a2f6f..d55733fe4 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.sql.shims.SparkShimLoader
@@ -133,19 +133,6 @@ case class ColumnarBroadcastExchangeExec(mode:
BroadcastMode, child: SparkPlan)
ColumnarBroadcastExchangeExec(canonicalized, child.canonicalized)
}
- override protected def doValidateInternal(): ValidationResult = {
- BackendsApiManager.getValidatorApiInstance
- .doSchemaValidate(schema)
- .map {
- reason =>
- {
- ValidationResult.notOk(
- s"Unsupported schema in broadcast exchange: $schema, reason:
$reason")
- }
- }
- .getOrElse(ValidationResult.ok)
- }
-
override def doPrepare(): Unit = {
// Materialize the future.
relationFuture
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]