This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 621a479de [GLUTEN-5771][VL] Add metrics for 
ColumnarArrowEvalPythonExec (#5772)
621a479de is described below

commit 621a479de0496d65b4bd76a37f613f67ceaaabcb
Author: Yan Ma <[email protected]>
AuthorDate: Thu May 23 08:29:01 2024 +0800

    [GLUTEN-5771][VL] Add metrics for ColumnarArrowEvalPythonExec (#5772)
    
    This patch adds metric for ColumnarArrowEvalPythonExec, also a brief guide 
on how to use pandas UDF is added.
    
    
    ---------
    
    Co-authored-by: Hongze Zhang <[email protected]>
---
 .../api}/python/ColumnarArrowEvalPythonExec.scala  | 42 +++++++++++++++++++-
 docs/developers/{VeloxNativeUDF.md => VeloxUDF.md} | 45 ++++++++++++++++++++++
 2 files changed, 86 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
similarity index 90%
rename from 
backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
rename to 
backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index fd8dfc25b..d5639057d 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -28,6 +28,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, 
BasePythonRunnerShim, EvalPythonExec, PythonUDFRunner}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -209,7 +210,13 @@ case class ColumnarArrowEvalPythonExec(
   extends EvalPythonExec
   with GlutenPlan {
   override def supportsColumnar: Boolean = true
-  // FIXME: incorrect metrics updater
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, 
"output_batches"),
+    "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input 
rows"),
+    "processTime" -> SQLMetrics.createTimingMetric(sparkContext, 
"totaltime_arrow_udf")
+  )
 
   override protected def evaluate(
       funcs: Seq[ChainedPythonFunctions],
@@ -277,6 +284,10 @@ case class ColumnarArrowEvalPythonExec(
   }
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val numOutputRows = longMetric("numOutputRows")
+    val numOutputBatches = longMetric("numOutputBatches")
+    val numInputRows = longMetric("numInputRows")
+    val procTime = longMetric("processTime")
     val inputRDD = child.executeColumnar()
     inputRDD.mapPartitions {
       iter =>
@@ -318,12 +329,15 @@ case class ColumnarArrowEvalPythonExec(
 
         val contextAwareIterator = new ContextAwareIterator(context, iter)
         val inputCbCache = new ArrayBuffer[ColumnarBatch]()
+        var start_time: Long = 0
         val inputBatchIter = contextAwareIterator.map {
           inputCb =>
+            start_time = System.nanoTime()
             
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
             ColumnarBatches.retain(inputCb)
             // 0. cache input for later merge
             inputCbCache += inputCb
+            numInputRows += inputCb.numRows
             // We only need to pass the referred cols data to python worker 
for evaluation.
             var colsForEval = new ArrayBuffer[ColumnVector]()
             for (i <- originalOffsets) {
@@ -341,11 +355,20 @@ case class ColumnarArrowEvalPythonExec(
               val joinedVectors = (0 until inputCb.numCols).toArray.map(
                 i => inputCb.column(i)) ++ (0 until 
outputCb.numCols).toArray.map(
                 i => outputCb.column(i))
+              // Columns in outputCb has random 0 or 1 refCnt and will fail 
checks in ensureOffload,
+              // so we do a hard reset here.
+              (0 until joinedVectors.length).foreach(
+                i => {
+                  
adjustRefCnt(joinedVectors(i).asInstanceOf[ArrowWritableColumnVector], 1)
+                })
               val numRows = inputCb.numRows
+              numOutputBatches += 1
+              numOutputRows += numRows
               val batch = new ColumnarBatch(joinedVectors, numRows)
               val offloaded =
                 
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
               ColumnarBatches.release(outputCb)
+              procTime += (System.nanoTime() - start_time) / 1000000
               offloaded
           }
         Iterators
@@ -358,6 +381,23 @@ case class ColumnarArrowEvalPythonExec(
     }
   }
 
+  private def adjustRefCnt(vector: ArrowWritableColumnVector, to: Long): Unit 
= {
+    val from = vector.refCnt()
+    if (from == to) {
+      return
+    }
+    if (from > to) {
+      do {
+        vector.close()
+      } while (vector.refCnt() == to)
+      return
+    }
+    // from < to
+    do {
+      vector.retain()
+    } while (vector.refCnt() == to)
+  }
+
   override protected def withNewChildInternal(newChild: SparkPlan): 
ColumnarArrowEvalPythonExec =
     copy(udfs, resultAttrs, newChild)
 }
diff --git a/docs/developers/VeloxNativeUDF.md b/docs/developers/VeloxUDF.md
similarity index 78%
rename from docs/developers/VeloxNativeUDF.md
rename to docs/developers/VeloxUDF.md
index b95190593..b88c4de15 100644
--- a/docs/developers/VeloxNativeUDF.md
+++ b/docs/developers/VeloxUDF.md
@@ -179,3 +179,48 @@ The output from spark-shell will be like
 +------------------+----------------+
 ```
 
+# Pandas UDFs (a.k.a. Vectorized UDFs)
+
+## Introduction
+
+Pandas UDFs are user defined functions that are executed by Spark using Arrow 
to transfer data and Pandas to work with the data, which allows vectorized 
operations. A Pandas UDF is defined using the pandas_udf() as a decorator or to 
wrap the function, and no additional configuration is required.
+A Pandas UDF behaves as a regular PySpark function API in general. For more 
details, you can refer 
[doc](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html).
+
+## Using Pandas UDFs in Gluten with Velox Backend
+
+Similar as in vanilla Spark, user needs to set up pyspark/arrow dependencies 
properly first. You may can refer following steps:
+
+```
+pip3 install pyspark==$SPARK_VERSION cython
+pip3 install pandas pyarrow
+```
+
+Gluten provides a config to control enable `ColumnarArrowEvalPython` or not, 
with `true` as defalt.
+
+```
+spark.gluten.sql.columnar.arrowUdf
+```
+
+Then take following `PySpark` code for example:
+
+```
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+import pyspark.sql.functions as F
+import os
+@pandas_udf('long')
+def pandas_plus_one(v):
+    return (v + 1)
+df = 
spark.read.orc("path_to_file").select("quantity").withColumn("processed_quantity",
 pandas_plus_one("quantity")).select("quantity")
+```
+
+The expected physical plan will be:
+
+```
+== Physical Plan ==
+VeloxColumnarToRowExec
++- ^(2) ProjectExecTransformer [pythonUDF0#45L AS processed_quantity#41L]
+   +- ^(2) InputIteratorTransformer[quantity#2L, pythonUDF0#45L]
+      +- ^(2) InputAdapter
+         +- ^(2) ColumnarArrowEvalPython [pandas_plus_one(quantity#2L)#40L], 
[pythonUDF0#45L], 200
+            +- ^(1) NativeFileScan orc [quantity#2L] Batched: true, 
DataFilters: [], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/***], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<quantity:bigint>
+```


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to