This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fb8c74b8d6e [SPARK-44411][SQL] Use PartitionEvaluator API in
ArrowEvalPythonExec and BatchEvalPythonExec
fb8c74b8d6e is described below
commit fb8c74b8d6e46df40bb2d8ec33589406f64dcb02
Author: Vinod KC <[email protected]>
AuthorDate: Wed Jul 19 11:58:22 2023 +0800
[SPARK-44411][SQL] Use PartitionEvaluator API in ArrowEvalPythonExec and
BatchEvalPythonExec
### What changes were proposed in this pull request?
SQL operators `ArrowEvalPythonExec` & `BatchEvalPythonExec` are updated to
use the `PartitionEvaluator` API to do execution.
### Why are the changes needed?
To avoid the use of lambda during distributed execution.
Ref: SPARK-43061 for more details.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing test cases. Once all SQL operators are refactored, will enable
`spark.sql.execution.usePartitionEvaluator` by default, so all tests cover this
code path.
Closes #41998 from vinodkc/br_SPARK-44411_EvalPython.
Authored-by: Vinod KC <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/python/ArrowEvalPythonExec.scala | 43 +++++++---
.../sql/execution/python/BatchEvalPythonExec.scala | 29 +++++--
...Exec.scala => EvalPythonEvaluatorFactory.scala} | 91 ++++++++--------------
.../sql/execution/python/EvalPythonExec.scala | 84 ++------------------
4 files changed, 98 insertions(+), 149 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 2e25ee2ba74..0e4a420b4b3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -24,6 +24,7 @@ import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils
@@ -63,20 +64,47 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute]
evalType: Int)
extends EvalPythonExec with PythonSQLMetrics {
- private val batchSize = conf.arrowMaxRecordsPerBatch
- private val sessionLocalTimeZone = conf.sessionLocalTimeZone
- private val largeVarTypes = conf.arrowUseLargeVarTypes
- private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
private[this] val jobArtifactUUID =
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
- protected override def evaluate(
+ override protected def evaluatorFactory: EvalPythonEvaluatorFactory = {
+ new ArrowEvalPythonEvaluatorFactory(
+ child.output,
+ udfs,
+ output,
+ conf.arrowMaxRecordsPerBatch,
+ evalType,
+ conf.sessionLocalTimeZone,
+ conf.arrowUseLargeVarTypes,
+ ArrowUtils.getPythonRunnerConfMap(conf),
+ pythonMetrics,
+ jobArtifactUUID)
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
+}
+
+class ArrowEvalPythonEvaluatorFactory(
+ childOutput: Seq[Attribute],
+ udfs: Seq[PythonUDF],
+ output: Seq[Attribute],
+ batchSize: Int,
+ evalType: Int,
+ sessionLocalTimeZone: String,
+ largeVarTypes: Boolean,
+ pythonRunnerConf: Map[String, String],
+ pythonMetrics: Map[String, SQLMetric],
+ jobArtifactUUID: Option[String])
+ extends EvalPythonEvaluatorFactory(childOutput, udfs, output) {
+
+ override def evaluate(
funcs: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
context: TaskContext): Iterator[InternalRow] = {
- val outputTypes = output.drop(child.output.length).map(_.dataType)
+ val outputTypes = output.drop(childOutput.length).map(_.dataType)
// DO NOT use iter.grouped(). See BatchIterator.
val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else
Iterator(iter)
@@ -99,7 +127,4 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute]
batch.rowIterator.asScala
}
}
-
- override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- copy(child = newChild)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
index 71f1610bcec..1de8f55d84b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -26,6 +26,7 @@ import org.apache.spark.api.python.{ChainedPythonFunctions,
PythonEvalType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{StructField, StructType}
/**
@@ -36,13 +37,34 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute]
private[this] val jobArtifactUUID =
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
- protected override def evaluate(
+ override protected def evaluatorFactory: EvalPythonEvaluatorFactory = {
+ new BatchEvalPythonEvaluatorFactory(
+ child.output,
+ udfs,
+ output,
+ pythonMetrics,
+ jobArtifactUUID)
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
BatchEvalPythonExec =
+ copy(child = newChild)
+}
+
+class BatchEvalPythonEvaluatorFactory(
+ childOutput: Seq[Attribute],
+ udfs: Seq[PythonUDF],
+ output: Seq[Attribute],
+ pythonMetrics: Map[String, SQLMetric],
+ jobArtifactUUID: Option[String])
+ extends EvalPythonEvaluatorFactory(childOutput, udfs, output) {
+
+ override def evaluate(
funcs: Seq[ChainedPythonFunctions],
argOffsets: Array[Array[Int]],
iter: Iterator[InternalRow],
schema: StructType,
context: TaskContext): Iterator[InternalRow] = {
- EvaluatePython.registerPicklers() // register pickler for Row
+ EvaluatePython.registerPicklers() // register pickler for Row
// Input iterator to Python.
val inputIterator = BatchEvalPythonExec.getInputIterator(iter, schema)
@@ -77,9 +99,6 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute]
}
}
}
-
- override protected def withNewChildInternal(newChild: SparkPlan):
BatchEvalPythonExec =
- copy(child = newChild)
}
object BatchEvalPythonExec {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonEvaluatorFactory.scala
similarity index 53%
copy from
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
copy to
sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonEvaluatorFactory.scala
index 70bcbe77fba..10bb3a45be9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonEvaluatorFactory.scala
@@ -21,61 +21,18 @@ import java.io.File
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
+import org.apache.spark.{ContextAwareIterator, PartitionEvaluator,
PartitionEvaluatorFactory, SparkEnv, TaskContext}
import org.apache.spark.api.python.ChainedPythonFunctions
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.UnaryExecNode
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.util.Utils
-
-/**
- * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at
a time.
- *
- * Python evaluation works by sending the necessary (projected) input data via
a socket to an
- * external Python process, and combine the result from the Python process
with the original row.
- *
- * For each row we send to Python, we also put it in a queue first. For each
output row from Python,
- * we drain the queue to find the original input row. Note that if the Python
process is way too
- * slow, this could lead to the queue growing unbounded and spill into disk
when run out of memory.
- *
- * Here is a diagram to show how this works:
- *
- * Downstream (for parent)
- * / \
- * / socket (output of UDF)
- * / \
- * RowQueue Python
- * \ /
- * \ socket (input of UDF)
- * \ /
- * upstream (from child)
- *
- * The rows sent to and received from Python are packed into batches (100
rows) and serialized,
- * there should be always some rows buffered in the socket or Python process,
so the pulling from
- * RowQueue ALWAYS happened after pushing into it.
- */
-trait EvalPythonExec extends UnaryExecNode {
- def udfs: Seq[PythonUDF]
- def resultAttrs: Seq[Attribute]
-
- override def output: Seq[Attribute] = child.output ++ resultAttrs
-
- override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
-
- private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions,
Seq[Expression]) = {
- udf.children match {
- case Seq(u: PythonUDF) =>
- val (chained, children) = collectFunctions(u)
- (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
- case children =>
- // There should not be any other UDFs, or the children can't be
evaluated directly.
- assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
- (ChainedPythonFunctions(Seq(udf.func)), udf.children)
- }
- }
+abstract class EvalPythonEvaluatorFactory(
+ childOutput: Seq[Attribute],
+ udfs: Seq[PythonUDF],
+ output: Seq[Attribute])
+ extends PartitionEvaluatorFactory[InternalRow, InternalRow] {
protected def evaluate(
funcs: Seq[ChainedPythonFunctions],
@@ -84,17 +41,35 @@ trait EvalPythonExec extends UnaryExecNode {
schema: StructType,
context: TaskContext): Iterator[InternalRow]
- protected override def doExecute(): RDD[InternalRow] = {
- val inputRDD = child.execute().map(_.copy())
-
- inputRDD.mapPartitions { iter =>
+ override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow]
=
+ new EvalPythonPartitionEvaluator
+
+ private class EvalPythonPartitionEvaluator
+ extends PartitionEvaluator[InternalRow, InternalRow] {
+ private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions,
Seq[Expression]) = {
+ udf.children match {
+ case Seq(u: PythonUDF) =>
+ val (chained, children) = collectFunctions(u)
+ (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+ case children =>
+ // There should not be any other UDFs, or the children can't be
evaluated directly.
+ assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
+ (ChainedPythonFunctions(Seq(udf.func)), udf.children)
+ }
+ }
+ override def eval(
+ partitionIndex: Int,
+ iters: Iterator[InternalRow]*): Iterator[InternalRow] = {
+ val iter = iters.head
val context = TaskContext.get()
val contextAwareIterator = new ContextAwareIterator(context, iter)
// The queue used to buffer input rows so we can drain it to
// combine input with output from Python.
- val queue = HybridRowQueue(context.taskMemoryManager(),
- new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
+ val queue = HybridRowQueue(
+ context.taskMemoryManager(),
+ new File(Utils.getLocalDir(SparkEnv.get.conf)),
+ childOutput.length)
context.addTaskCompletionListener[Unit] { ctx =>
queue.close()
}
@@ -115,7 +90,7 @@ trait EvalPythonExec extends UnaryExecNode {
}
}.toArray
}.toArray
- val projection = MutableProjection.create(allInputs.toSeq, child.output)
+ val projection = MutableProjection.create(allInputs.toSeq, childOutput)
projection.initialize(context.partitionId())
val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
StructField(s"_$i", dt)
@@ -127,8 +102,8 @@ trait EvalPythonExec extends UnaryExecNode {
projection(inputRow)
}
- val outputRowIterator = evaluate(
- pyFuncs, argOffsets, projectedRowIter, schema, context)
+ val outputRowIterator =
+ evaluate(pyFuncs, argOffsets, projectedRowIter, schema, context)
val joined = new JoinedRow
val resultProj = UnsafeProjection.create(output, output)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 70bcbe77fba..1c8b0f2228f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -17,19 +17,10 @@
package org.apache.spark.sql.execution.python
-import java.io.File
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
-import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.UnaryExecNode
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
-import org.apache.spark.util.Utils
-
/**
* A physical plan that evaluates a [[PythonUDF]], one partition of tuples at
a time.
@@ -61,80 +52,19 @@ trait EvalPythonExec extends UnaryExecNode {
def udfs: Seq[PythonUDF]
def resultAttrs: Seq[Attribute]
+ protected def evaluatorFactory: EvalPythonEvaluatorFactory
+
override def output: Seq[Attribute] = child.output ++ resultAttrs
override def producedAttributes: AttributeSet = AttributeSet(resultAttrs)
- private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions,
Seq[Expression]) = {
- udf.children match {
- case Seq(u: PythonUDF) =>
- val (chained, children) = collectFunctions(u)
- (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
- case children =>
- // There should not be any other UDFs, or the children can't be
evaluated directly.
- assert(children.forall(!_.exists(_.isInstanceOf[PythonUDF])))
- (ChainedPythonFunctions(Seq(udf.func)), udf.children)
- }
- }
-
- protected def evaluate(
- funcs: Seq[ChainedPythonFunctions],
- argOffsets: Array[Array[Int]],
- iter: Iterator[InternalRow],
- schema: StructType,
- context: TaskContext): Iterator[InternalRow]
-
protected override def doExecute(): RDD[InternalRow] = {
val inputRDD = child.execute().map(_.copy())
-
- inputRDD.mapPartitions { iter =>
- val context = TaskContext.get()
- val contextAwareIterator = new ContextAwareIterator(context, iter)
-
- // The queue used to buffer input rows so we can drain it to
- // combine input with output from Python.
- val queue = HybridRowQueue(context.taskMemoryManager(),
- new File(Utils.getLocalDir(SparkEnv.get.conf)), child.output.length)
- context.addTaskCompletionListener[Unit] { ctx =>
- queue.close()
- }
-
- val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
-
- // flatten all the arguments
- val allInputs = new ArrayBuffer[Expression]
- val dataTypes = new ArrayBuffer[DataType]
- val argOffsets = inputs.map { input =>
- input.map { e =>
- if (allInputs.exists(_.semanticEquals(e))) {
- allInputs.indexWhere(_.semanticEquals(e))
- } else {
- allInputs += e
- dataTypes += e.dataType
- allInputs.length - 1
- }
- }.toArray
- }.toArray
- val projection = MutableProjection.create(allInputs.toSeq, child.output)
- projection.initialize(context.partitionId())
- val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
- StructField(s"_$i", dt)
- }.toArray)
-
- // Add rows to queue to join later with the result.
- val projectedRowIter = contextAwareIterator.map { inputRow =>
- queue.add(inputRow.asInstanceOf[UnsafeRow])
- projection(inputRow)
- }
-
- val outputRowIterator = evaluate(
- pyFuncs, argOffsets, projectedRowIter, schema, context)
-
- val joined = new JoinedRow
- val resultProj = UnsafeProjection.create(output, output)
-
- outputRowIterator.map { outputRow =>
- resultProj(joined(queue.remove(), outputRow))
+ if (conf.usePartitionEvaluator) {
+ inputRDD.mapPartitionsWithEvaluator(evaluatorFactory)
+ } else {
+ inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
+ evaluatorFactory.createEvaluator().eval(index, iter)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]