This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new cfd817ae1a [VL] Fallback pandas udf when input is not an instance of
AttributeReference (#9385)
cfd817ae1a is described below
commit cfd817ae1af3706f8675b48a0ee2a5e350848be5
Author: Surbhi-Vijay <[email protected]>
AuthorDate: Fri Apr 25 20:40:04 2025 +0530
[VL] Fallback pandas udf when input is not an instance of
AttributeReference (#9385)
Co-authored-by: suvijayv <[email protected]>
---
.../api/python/ColumnarArrowEvalPythonExec.scala | 34 +++++++++++++---------
1 file changed, 21 insertions(+), 13 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 4a5f43b7fd..7564500aae 100644
---
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -18,8 +18,8 @@ package org.apache.spark.api.python
import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
import org.apache.gluten.columnarbatch.ColumnarBatches
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
@@ -211,12 +211,28 @@ case class ColumnarArrowEvalPythonExec(
child: SparkPlan,
evalType: Int)
extends EvalPythonExec
- with GlutenPlan {
+ with ValidatablePlan {
override def batchType(): Convention.BatchType = ArrowJavaBatch
override def rowType0(): Convention.RowType = Convention.RowType.None
+ override protected def doValidateInternal(): ValidationResult = {
+ val (_, inputs) = udfs.map(collectFunctions).unzip
+ inputs.foreach {
+ input =>
+ input.foreach {
+ case e: AttributeReference if child.output.exists(_.exprId ==
e.exprId) =>
+ // Valid case, continue validation
+ case _: AttributeReference =>
+ return ValidationResult.failed("Expression Id does not exist for
AttributeReference")
+ case _ =>
+ return ValidationResult.failed("UDF input is not an instance of
AttributeReference")
+ }
+ }
+ super.doValidateInternal()
+ }
+
override def requiredChildConvention(): Seq[ConventionReq] = List(
ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatch)))
@@ -311,19 +327,11 @@ case class ColumnarArrowEvalPythonExec(
input =>
input.map {
e =>
- if (!e.isInstanceOf[AttributeReference]) {
- throw new GlutenException(
- "ColumnarArrowEvalPythonExec should only have
[AttributeReference] inputs.")
- } else if (allInputs.exists(_.semanticEquals(e))) {
+ if (allInputs.exists(_.semanticEquals(e))) {
allInputs.indexWhere(_.semanticEquals(e))
} else {
- var offset: Int = -1
- offset = child.output.indexWhere(
+ val offset = child.output.indexWhere(
_.exprId.equals(e.asInstanceOf[AttributeReference].exprId))
- if (offset == -1) {
- throw new GlutenException(
- "ColumnarArrowEvalPythonExec can't find referred input
col.")
- }
originalOffsets += offset
allInputs += e
dataTypes += e.dataType
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]