This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8b4eff01d1 [GLUTEN-11088][VL] Spark 4.0: Fix ArrowEvalPythonExecSuite
(#11288)
8b4eff01d1 is described below
commit 8b4eff01d1c3a261a07ded8b3e901d969795f059
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Dec 16 16:01:24 2025 +0800
[GLUTEN-11088][VL] Spark 4.0: Fix ArrowEvalPythonExecSuite (#11288)
* [GLUTEN-11088][VL] Spark 4.0: Fix ArrowEvalPythonExecSuite
---
.../api/python/ColumnarArrowEvalPythonExec.scala | 83 +++++++++++++++-------
.../python/ArrowEvalPythonExecSuite.scala | 8 +--
.../execution/python/BasePythonRunnerShim.scala | 13 ++--
.../sql/execution/python/EvalPythonExecBase.scala | 7 ++
.../execution/python/BasePythonRunnerShim.scala | 13 ++--
.../sql/execution/python/EvalPythonExecBase.scala | 7 ++
.../execution/python/BasePythonRunnerShim.scala | 13 ++--
.../sql/execution/python/EvalPythonExecBase.scala | 7 ++
.../execution/python/BasePythonRunnerShim.scala | 10 +--
.../sql/execution/python/EvalPythonExecBase.scala | 10 +++
.../execution/python/BasePythonRunnerShim.scala | 15 ++--
.../sql/execution/python/EvalPythonExecBase.scala | 11 ++-
12 files changed, 147 insertions(+), 50 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 0c623b2b5f..bfa33a804c 100644
---
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.utils.{SparkArrowUtil, SparkSchemaUtil,
SparkVectorUtil}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SparkVersionUtil, Utils}
import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot}
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ArrowStreamWriter}
@@ -49,12 +49,12 @@ import scala.collection.mutable.ArrayBuffer
class ColumnarArrowPythonRunner(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
- argOffsets: Array[Array[Int]],
+ argMetas: Array[Array[(Int, Option[String])]],
schema: StructType,
timeZoneId: String,
conf: Map[String, String],
pythonMetrics: Map[String, SQLMetric])
- extends BasePythonRunnerShim(funcs, evalType, argOffsets, pythonMetrics) {
+ extends BasePythonRunnerShim(funcs, evalType, argMetas, pythonMetrics) {
override val simplifiedTraceback: Boolean =
SQLConf.get.pysparkSimplifiedTraceback
@@ -149,7 +149,7 @@ class ColumnarArrowPythonRunner(
PythonRDD.writeUTF(k, dataOut)
PythonRDD.writeUTF(v, dataOut)
}
- ColumnarArrowPythonRunner.this.writeUdf(dataOut, argOffsets)
+ ColumnarArrowPythonRunner.this.writeUdf(dataOut, argMetas)
}
// For Spark earlier than 4.0. It overrides the corresponding abstract
method
@@ -165,6 +165,12 @@ class ColumnarArrowPythonRunner(
}
def writeToStreamHelper(dataOut: DataOutputStream): Boolean = {
+ if (!inputIterator.hasNext) {
+ // See https://issues.apache.org/jira/browse/SPARK-44705:
+ // Starting from Spark 4.0, we should return false once the iterator
is drained out,
+ // otherwise Spark won't stop calling this method repeatedly.
+ return false
+ }
var numRows: Long = 0
val arrowSchema = SparkSchemaUtil.toArrowSchema(schema, timeZoneId)
val allocator = ArrowBufferAllocators.contextInstance()
@@ -264,7 +270,7 @@ case class ColumnarArrowEvalPythonExec(
protected def evaluateColumnar(
funcs: Seq[(ChainedPythonFunctions, Long)],
- argOffsets: Array[Array[Int]],
+ argMetas: Array[Array[(Int, Option[String])]],
iter: Iterator[ColumnarBatch],
schema: StructType,
context: TaskContext): Iterator[ColumnarBatch] = {
@@ -274,7 +280,7 @@ case class ColumnarArrowEvalPythonExec(
val columnarBatchIter = new ColumnarArrowPythonRunner(
funcs,
evalType,
- argOffsets,
+ argMetas,
schema,
sessionLocalTimeZone,
pythonRunnerConf,
@@ -306,22 +312,51 @@ case class ColumnarArrowEvalPythonExec(
val allInputs = new ArrayBuffer[Expression]
val dataTypes = new ArrayBuffer[DataType]
val originalOffsets = new ArrayBuffer[Int]
- val argOffsets = inputs.map {
- input =>
- input.map {
- e =>
- if (allInputs.exists(_.semanticEquals(e))) {
- allInputs.indexWhere(_.semanticEquals(e))
- } else {
- val offset = child.output.indexWhere(
- _.exprId.equals(e.asInstanceOf[AttributeReference].exprId))
- originalOffsets += offset
- allInputs += e
- dataTypes += e.dataType
- allInputs.length - 1
- }
- }.toArray
- }.toArray
+ val argMetas: Array[Array[(Int, Option[String])]] = if
(SparkVersionUtil.gteSpark40) {
+ // Spark 4.0 requires ArgumentMetadata rather than trivial
integer-based offset.
+ // See https://issues.apache.org/jira/browse/SPARK-44918.
+ inputs.map {
+ input =>
+ input.map {
+ e =>
+ val (key, value) = e match {
+ case EvalPythonExecBase.NamedArgumentExpressionShim(key,
value) =>
+ (Some(key), value)
+ case _ =>
+ (None, e)
+ }
+ val pair: (Int, Option[String]) = if
(allInputs.exists(_.semanticEquals(value))) {
+ allInputs.indexWhere(_.semanticEquals(value)) -> key
+ } else {
+ val offset = child.output.indexWhere(
+
_.exprId.equals(e.asInstanceOf[AttributeReference].exprId))
+ originalOffsets += offset
+ allInputs += value
+ dataTypes += value.dataType
+ (allInputs.length - 1) -> key
+ }
+ pair
+ }.toArray
+ }.toArray
+ } else {
+ inputs.map {
+ input =>
+ input.map {
+ e =>
+ val pair: (Int, Option[String]) = if
(allInputs.exists(_.semanticEquals(e))) {
+ allInputs.indexWhere(_.semanticEquals(e)) -> None
+ } else {
+ val offset = child.output.indexWhere(
+
_.exprId.equals(e.asInstanceOf[AttributeReference].exprId))
+ originalOffsets += offset
+ allInputs += e
+ dataTypes += e.dataType
+ (allInputs.length - 1) -> None
+ }
+ pair
+ }.toArray
+ }.toArray
+ }
val schema = StructType(dataTypes.zipWithIndex.map {
case (dt, i) =>
StructField(s"_$i", dt)
@@ -339,7 +374,7 @@ case class ColumnarArrowEvalPythonExec(
inputCbCache += inputCb
numInputRows += inputCb.numRows
// We only need to pass the referred cols data to python worker
for evaluation.
- var colsForEval = new ArrayBuffer[ColumnVector]()
+ val colsForEval = new ArrayBuffer[ColumnVector]()
for (i <- originalOffsets) {
colsForEval += inputCb.column(i)
}
@@ -347,7 +382,7 @@ case class ColumnarArrowEvalPythonExec(
}
val outputColumnarBatchIterator =
- evaluateColumnar(pyFuncs, argOffsets, inputBatchIter, schema,
context)
+ evaluateColumnar(pyFuncs, argMetas, inputBatchIter, schema, context)
val res =
outputColumnarBatchIterator.zipWithIndex.map {
case (outputCb, batchId) =>
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
index 0ea34ec6ad..52a17995f3 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/python/ArrowEvalPythonExecSuite.scala
@@ -39,8 +39,7 @@ class ArrowEvalPythonExecSuite extends
WholeStageTransformerSuite {
.set("spark.executor.cores", "1")
}
- // TODO: fix on spark-4.0
- testWithMaxSparkVersion("arrow_udf test: without projection", "3.5") {
+ test("arrow_udf test: without projection") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0",
1), ("3", 0))
.toDF("a", "b")
@@ -60,8 +59,7 @@ class ArrowEvalPythonExecSuite extends
WholeStageTransformerSuite {
checkAnswer(df2, expected)
}
- // TODO: fix on spark-4.0
- testWithMaxSparkVersion("arrow_udf test: with unrelated projection", "3.5") {
+ test("arrow_udf test: with unrelated projection") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0",
1), ("3", 0))
.toDF("a", "b")
@@ -81,7 +79,7 @@ class ArrowEvalPythonExecSuite extends
WholeStageTransformerSuite {
checkAnswer(df, expected)
}
- // TODO: fix on spark-4.0
+ // A fix needed for Spark 4.0 change in
https://github.com/apache/spark/pull/42864.
testWithMaxSparkVersion("arrow_udf test: with preprojection", "3.5") {
lazy val base =
Seq(("1", 1), ("1", 2), ("2", 1), ("2", 2), ("3", 1), ("3", 2), ("0",
1), ("3", 0))
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
index fc0f826905..82d971d5d6 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -28,9 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
- argOffsets: Array[Array[Int]],
+ argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
- extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs.map(_._1),
evalType, argOffsets) {
+ extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
+ funcs.map(_._1),
+ evalType,
+ argMetas.map(_.map(_._1))) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
type Writer = WriterThread
@@ -43,8 +46,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer
- protected def writeUdf(dataOut: DataOutputStream, argOffsets:
Array[Array[Int]]): Unit = {
- PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
+ protected def writeUdf(
+ dataOut: DataOutputStream,
+ argMetas: Array[Array[(Int, Option[String])]]): Unit = {
+ PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1),
argMetas.map(_.map(_._1)))
}
override protected def newWriterThread(
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
index 843d7e0169..7221e330a7 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
+++
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
abstract class EvalPythonExecBase extends EvalPythonExec {
@@ -32,3 +33,9 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support
evaluate")
}
}
+
+object EvalPythonExecBase {
+ object NamedArgumentExpressionShim {
+ def unapply(expr: Expression): Option[(String, Expression)] = None
+ }
+}
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
index fc0f826905..82d971d5d6 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -28,9 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
- argOffsets: Array[Array[Int]],
+ argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
- extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs.map(_._1),
evalType, argOffsets) {
+ extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
+ funcs.map(_._1),
+ evalType,
+ argMetas.map(_.map(_._1))) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
type Writer = WriterThread
@@ -43,8 +46,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer
- protected def writeUdf(dataOut: DataOutputStream, argOffsets:
Array[Array[Int]]): Unit = {
- PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
+ protected def writeUdf(
+ dataOut: DataOutputStream,
+ argMetas: Array[Array[(Int, Option[String])]]): Unit = {
+ PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1),
argMetas.map(_.map(_._1)))
}
override protected def newWriterThread(
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
index 843d7e0169..7221e330a7 100644
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
+++
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
abstract class EvalPythonExecBase extends EvalPythonExec {
@@ -32,3 +33,9 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support
evaluate")
}
}
+
+object EvalPythonExecBase {
+ object NamedArgumentExpressionShim {
+ def unapply(expr: Expression): Option[(String, Expression)] = None
+ }
+}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
index fc0f826905..82d971d5d6 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -28,9 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
- argOffsets: Array[Array[Int]],
+ argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
- extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs.map(_._1),
evalType, argOffsets) {
+ extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
+ funcs.map(_._1),
+ evalType,
+ argMetas.map(_.map(_._1))) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
type Writer = WriterThread
@@ -43,8 +46,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer
- protected def writeUdf(dataOut: DataOutputStream, argOffsets:
Array[Array[Int]]): Unit = {
- PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
+ protected def writeUdf(
+ dataOut: DataOutputStream,
+ argMetas: Array[Array[(Int, Option[String])]]): Unit = {
+ PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1),
argMetas.map(_.map(_._1)))
}
override protected def newWriterThread(
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
index 843d7e0169..7221e330a7 100644
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
+++
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.StructType
abstract class EvalPythonExecBase extends EvalPythonExec {
@@ -32,3 +33,9 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support
evaluate")
}
}
+
+object EvalPythonExecBase {
+ object NamedArgumentExpressionShim {
+ def unapply(expr: Expression): Option[(String, Expression)] = None
+ }
+}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
index 1a14622f87..ecabc9a93b 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -28,12 +28,12 @@ import java.net.Socket
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
- argOffsets: Array[Array[Int]],
+ argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
funcs.map(_._1),
evalType,
- argOffsets,
+ argMetas.map(_.map(_._1)),
None) {
// The type aliases below provide consistent type names in child classes,
// ensuring code compatibility with both Spark 4.0 and earlier versions.
@@ -47,8 +47,10 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer
- protected def writeUdf(dataOut: DataOutputStream, argOffsets:
Array[Array[Int]]): Unit = {
- PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1), argOffsets)
+ protected def writeUdf(
+ dataOut: DataOutputStream,
+ argMetas: Array[Array[(Int, Option[String])]]): Unit = {
+ PythonUDFRunner.writeUDFs(dataOut, funcs.map(_._1),
argMetas.map(_.map(_._1)))
}
override protected def newWriterThread(
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
index 843d7e0169..3e74f54901 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.python
import org.apache.spark.TaskContext
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression,
NamedArgumentExpression}
import org.apache.spark.sql.types.StructType
abstract class EvalPythonExecBase extends EvalPythonExec {
@@ -32,3 +33,12 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support
evaluate")
}
}
+
+object EvalPythonExecBase {
+ object NamedArgumentExpressionShim {
+ def unapply(expr: Expression): Option[(String, Expression)] = expr match {
+ case NamedArgumentExpression(key, value) => Some((key, value))
+ case _ => None
+ }
+ }
+}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
index b84ff9b9f6..127a0fc3cf 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/BasePythonRunnerShim.scala
@@ -20,6 +20,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.TaskContext
import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions,
PythonWorker}
import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.python.EvalPythonExec.ArgumentMetadata
import org.apache.spark.sql.vectorized.ColumnarBatch
import java.io.DataOutputStream
@@ -27,12 +28,12 @@ import java.io.DataOutputStream
abstract class BasePythonRunnerShim(
funcs: Seq[(ChainedPythonFunctions, Long)],
evalType: Int,
- argOffsets: Array[Array[Int]],
+ argMetas: Array[Array[(Int, Option[String])]],
pythonMetrics: Map[String, SQLMetric])
extends BasePythonRunner[ColumnarBatch, ColumnarBatch](
funcs.map(_._1),
evalType,
- argOffsets,
+ argMetas.map(_.map(_._1)),
None,
pythonMetrics) {
@@ -43,8 +44,14 @@ abstract class BasePythonRunnerShim(
partitionIndex: Int,
context: TaskContext): Writer
- protected def writeUdf(dataOut: DataOutputStream, argOffsets:
Array[Array[Int]]): Unit = {
- PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets, None)
+ protected def writeUdf(
+ dataOut: DataOutputStream,
+ argOffsets: Array[Array[(Int, Option[String])]]): Unit = {
+ PythonUDFRunner.writeUDFs(
+ dataOut,
+ funcs,
+ argOffsets.map(_.map(pair => ArgumentMetadata(pair._1, pair._2))),
+ None)
}
override protected def newWriter(
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
index 99acc2644f..7ad7ca6b09 100644
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
+++
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecBase.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.python
-import org.apache.spark.sql.execution.python.EvalPythonEvaluatorFactory
+import org.apache.spark.sql.catalyst.expressions.{Expression,
NamedArgumentExpression}
abstract class EvalPythonExecBase extends EvalPythonExec {
@@ -24,3 +24,12 @@ abstract class EvalPythonExecBase extends EvalPythonExec {
throw new IllegalStateException("EvalPythonExecTransformer doesn't support
evaluate")
}
}
+
+object EvalPythonExecBase {
+ object NamedArgumentExpressionShim {
+ def unapply(expr: Expression): Option[(String, Expression)] = expr match {
+ case NamedArgumentExpression(key, value) => Some((key, value))
+ case _ => None
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]