This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 4f22b2f944d [SPARK-44361][SQL] Use PartitionEvaluator API in 
MapInBatchExec
4f22b2f944d is described below

commit 4f22b2f944d3253898bddff9b0958b7a2c813bc7
Author: Vinod KC <vinod.kc...@gmail.com>
AuthorDate: Wed Jul 19 12:02:52 2023 +0800

    [SPARK-44361][SQL] Use PartitionEvaluator API in MapInBatchExec
    
    ### What changes were proposed in this pull request?
    
    SQL operator `MapInBatchExec` is updated to use the `PartitionEvaluator` 
API to do execution.
    Added a new method `mapPartitionsWithEvaluator` in `RDDBarrier`.
    
    ### Why are the changes needed?
    
    To avoid the use of lambda during distributed execution.
    Ref: SPARK-43061 for more details.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing test cases. Once all SQL operators are refactored, will enable 
`spark.sql.execution.usePartitionEvaluator` by default, so all tests cover this 
code path.
    
    Closes #42024 from vinodkc/br_SPARK-44361.
    
    Authored-by: Vinod KC <vinod.kc...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 9b43a9f3ea551a594835a4742f7b2d1fdb1cf518)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/rdd/RDDBarrier.scala    | 16 +++-
 .../python/MapInBatchEvaluatorFactory.scala        | 92 ++++++++++++++++++++++
 .../sql/execution/python/MapInBatchExec.scala      | 80 ++++++++-----------
 3 files changed, 137 insertions(+), 51 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
index b70ea0073c9..13ce8f1e1b5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala
@@ -19,8 +19,8 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.TaskContext
-import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.{PartitionEvaluatorFactory, TaskContext}
+import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 
 /**
  * :: Experimental ::
@@ -76,5 +76,17 @@ class RDDBarrier[T: ClassTag] private[spark] (rdd: RDD[T]) {
     )
   }
 
+  /**
+   * Return a new RDD by applying an evaluator to each partition of the 
wrapped RDD. The given
+   * evaluator factory will be serialized and sent to executors, and each task 
will create an
+   * evaluator with the factory, and use the evaluator to transform the data 
of the input
+   * partition.
+   */
+  @DeveloperApi
+  @Since("3.5.0")
+  def mapPartitionsWithEvaluator[U: ClassTag](
+      evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = 
rdd.withScope {
+    new MapPartitionsWithEvaluatorRDD(rdd, evaluatorFactory)
+  }
   // TODO: [SPARK-25247] add extra conf to RDDBarrier, e.g., timeout.
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
new file mode 100644
index 00000000000..efb063476a4
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchEvaluatorFactory.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{ContextAwareIterator, TaskContext}
+import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
+import org.apache.spark.api.python.{ChainedPythonFunctions}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+class MapInBatchEvaluatorFactory(
+    output: Seq[Attribute],
+    chainedFunc: Seq[ChainedPythonFunctions],
+    outputTypes: StructType,
+    batchSize: Int,
+    pythonEvalType: Int,
+    sessionLocalTimeZone: String,
+    largeVarTypes: Boolean,
+    pythonRunnerConf: Map[String, String],
+    pythonMetrics: Map[String, SQLMetric],
+    jobArtifactUUID: Option[String])
+    extends PartitionEvaluatorFactory[InternalRow, InternalRow] {
+
+  override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] 
=
+    new MapInBatchEvaluator
+
+  private class MapInBatchEvaluator extends PartitionEvaluator[InternalRow, 
InternalRow] {
+    override def eval(
+        partitionIndex: Int,
+        inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
+      assert(inputs.length == 1)
+      val inputIter = inputs.head
+      // Single function with one struct.
+      val argOffsets = Array(Array(0))
+      val context = TaskContext.get()
+      val contextAwareIterator = new ContextAwareIterator(context, inputIter)
+
+      // Here we wrap it via another row so that Python sides understand it
+      // as a DataFrame.
+      val wrappedIter = contextAwareIterator.map(InternalRow(_))
+
+      // DO NOT use iter.grouped(). See BatchIterator.
+      val batchIter =
+        if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else 
Iterator(wrappedIter)
+
+      val columnarBatchIter = new ArrowPythonRunner(
+        chainedFunc,
+        pythonEvalType,
+        argOffsets,
+        StructType(Array(StructField("struct", outputTypes))),
+        sessionLocalTimeZone,
+        largeVarTypes,
+        pythonRunnerConf,
+        pythonMetrics,
+        jobArtifactUUID).compute(batchIter, context.partitionId(), context)
+
+      val unsafeProj = UnsafeProjection.create(output, output)
+
+      columnarBatchIter
+        .flatMap { batch =>
+          // Scalar Iterator UDF returns a StructType column in ColumnarBatch, 
select
+          // the children here
+          val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
+          val outputVectors = output.indices.map(structVector.getChild)
+          val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
+          flattenedBatch.setNumRows(batch.numRows())
+          flattenedBatch.rowIterator.asScala
+        }
+        .map(unsafeProj)
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
index b4af3db3c83..0703f57c33d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala
@@ -17,18 +17,14 @@
 
 package org.apache.spark.sql.execution.python
 
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{ContextAwareIterator, JobArtifactSet, TaskContext}
+import org.apache.spark.JobArtifactSet
 import org.apache.spark.api.python.ChainedPythonFunctions
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.UnaryExecNode
-import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.ArrowUtils
-import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
 
 /**
  * A relation produced by applying a function that takes an iterator of batches
@@ -56,53 +52,39 @@ trait MapInBatchExec extends UnaryExecNode with 
PythonSQLMetrics {
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
   override protected def doExecute(): RDD[InternalRow] = {
-    def mapper(inputIter: Iterator[InternalRow]): Iterator[InternalRow] = {
-      // Single function with one struct.
-      val argOffsets = Array(Array(0))
-      val chainedFunc = Seq(ChainedPythonFunctions(Seq(pythonFunction)))
-      val sessionLocalTimeZone = conf.sessionLocalTimeZone
-      val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
-      val outputTypes = child.schema
-
-      val context = TaskContext.get()
-      val contextAwareIterator = new ContextAwareIterator(context, inputIter)
-
-      // Here we wrap it via another row so that Python sides understand it
-      // as a DataFrame.
-      val wrappedIter = contextAwareIterator.map(InternalRow(_))
-
-      // DO NOT use iter.grouped(). See BatchIterator.
-      val batchIter =
-        if (batchSize > 0) new BatchIterator(wrappedIter, batchSize) else 
Iterator(wrappedIter)
-
-      val columnarBatchIter = new ArrowPythonRunner(
-        chainedFunc,
-        pythonEvalType,
-        argOffsets,
-        StructType(Array(StructField("struct", outputTypes))),
-        sessionLocalTimeZone,
-        largeVarTypes,
-        pythonRunnerConf,
-        pythonMetrics,
-        jobArtifactUUID).compute(batchIter, context.partitionId(), context)
-
-      val unsafeProj = UnsafeProjection.create(output, output)
-
-      columnarBatchIter.flatMap { batch =>
-        // Scalar Iterator UDF returns a StructType column in ColumnarBatch, 
select
-        // the children here
-        val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
-        val outputVectors = output.indices.map(structVector.getChild)
-        val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
-        flattenedBatch.setNumRows(batch.numRows())
-        flattenedBatch.rowIterator.asScala
-      }.map(unsafeProj)
-    }
+    val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
+    val pythonFunction = func.asInstanceOf[PythonUDF].func
+    val chainedFunc = Seq(ChainedPythonFunctions(Seq(pythonFunction)))
+    val evaluatorFactory = new MapInBatchEvaluatorFactory(
+      output,
+      chainedFunc,
+      child.schema,
+      conf.arrowMaxRecordsPerBatch,
+      pythonEvalType,
+      conf.sessionLocalTimeZone,
+      conf.arrowUseLargeVarTypes,
+      pythonRunnerConf,
+      pythonMetrics,
+      jobArtifactUUID)
 
     if (isBarrier) {
-      child.execute().barrier().mapPartitions(mapper)
+      val rddBarrier = child.execute().barrier()
+      if (conf.usePartitionEvaluator) {
+        rddBarrier.mapPartitionsWithEvaluator(evaluatorFactory)
+      } else {
+        rddBarrier.mapPartitions { iter =>
+          evaluatorFactory.createEvaluator().eval(0, iter)
+        }
+      }
     } else {
-      child.execute().mapPartitionsInternal(mapper)
+      val inputRdd = child.execute()
+      if (conf.usePartitionEvaluator) {
+        inputRdd.mapPartitionsWithEvaluator(evaluatorFactory)
+      } else {
+        inputRdd.mapPartitionsInternal { iter =>
+          evaluatorFactory.createEvaluator().eval(0, iter)
+        }
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to