This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 621a479de [GLUTEN-5771][VL] Add metrics for
ColumnarArrowEvalPythonExec (#5772)
621a479de is described below
commit 621a479de0496d65b4bd76a37f613f67ceaaabcb
Author: Yan Ma <[email protected]>
AuthorDate: Thu May 23 08:29:01 2024 +0800
[GLUTEN-5771][VL] Add metrics for ColumnarArrowEvalPythonExec (#5772)
This patch adds metric for ColumnarArrowEvalPythonExec, also a brief guide
on how to use pandas UDF is added.
---------
Co-authored-by: Hongze Zhang <[email protected]>
---
.../api}/python/ColumnarArrowEvalPythonExec.scala | 42 +++++++++++++++++++-
docs/developers/{VeloxNativeUDF.md => VeloxUDF.md} | 45 ++++++++++++++++++++++
2 files changed, 86 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
similarity index 90%
rename from
backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
rename to
backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index fd8dfc25b..d5639057d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -28,6 +28,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec,
BasePythonRunnerShim, EvalPythonExec, PythonUDFRunner}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -209,7 +210,13 @@ case class ColumnarArrowEvalPythonExec(
extends EvalPythonExec
with GlutenPlan {
override def supportsColumnar: Boolean = true
- // FIXME: incorrect metrics updater
+
+ override lazy val metrics = Map(
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "numOutputBatches" -> SQLMetrics.createMetric(sparkContext,
"output_batches"),
+ "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input
rows"),
+ "processTime" -> SQLMetrics.createTimingMetric(sparkContext,
"totaltime_arrow_udf")
+ )
override protected def evaluate(
funcs: Seq[ChainedPythonFunctions],
@@ -277,6 +284,10 @@ case class ColumnarArrowEvalPythonExec(
}
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val numOutputRows = longMetric("numOutputRows")
+ val numOutputBatches = longMetric("numOutputBatches")
+ val numInputRows = longMetric("numInputRows")
+ val procTime = longMetric("processTime")
val inputRDD = child.executeColumnar()
inputRDD.mapPartitions {
iter =>
@@ -318,12 +329,15 @@ case class ColumnarArrowEvalPythonExec(
val contextAwareIterator = new ContextAwareIterator(context, iter)
val inputCbCache = new ArrayBuffer[ColumnarBatch]()
+ var start_time: Long = 0
val inputBatchIter = contextAwareIterator.map {
inputCb =>
+ start_time = System.nanoTime()
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
ColumnarBatches.retain(inputCb)
// 0. cache input for later merge
inputCbCache += inputCb
+ numInputRows += inputCb.numRows
// We only need to pass the referred cols data to python worker
for evaluation.
var colsForEval = new ArrayBuffer[ColumnVector]()
for (i <- originalOffsets) {
@@ -341,11 +355,20 @@ case class ColumnarArrowEvalPythonExec(
val joinedVectors = (0 until inputCb.numCols).toArray.map(
i => inputCb.column(i)) ++ (0 until
outputCb.numCols).toArray.map(
i => outputCb.column(i))
+ // Columns in outputCb has random 0 or 1 refCnt and will fail
checks in ensureOffload,
+ // so we do a hard reset here.
+ (0 until joinedVectors.length).foreach(
+ i => {
+
adjustRefCnt(joinedVectors(i).asInstanceOf[ArrowWritableColumnVector], 1)
+ })
val numRows = inputCb.numRows
+ numOutputBatches += 1
+ numOutputRows += numRows
val batch = new ColumnarBatch(joinedVectors, numRows)
val offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
ColumnarBatches.release(outputCb)
+ procTime += (System.nanoTime() - start_time) / 1000000
offloaded
}
Iterators
@@ -358,6 +381,23 @@ case class ColumnarArrowEvalPythonExec(
}
}
+ private def adjustRefCnt(vector: ArrowWritableColumnVector, to: Long): Unit
= {
+ val from = vector.refCnt()
+ if (from == to) {
+ return
+ }
+ if (from > to) {
+ do {
+ vector.close()
+ } while (vector.refCnt() == to)
+ return
+ }
+ // from < to
+ do {
+ vector.retain()
+ } while (vector.refCnt() == to)
+ }
+
override protected def withNewChildInternal(newChild: SparkPlan):
ColumnarArrowEvalPythonExec =
copy(udfs, resultAttrs, newChild)
}
diff --git a/docs/developers/VeloxNativeUDF.md b/docs/developers/VeloxUDF.md
similarity index 78%
rename from docs/developers/VeloxNativeUDF.md
rename to docs/developers/VeloxUDF.md
index b95190593..b88c4de15 100644
--- a/docs/developers/VeloxNativeUDF.md
+++ b/docs/developers/VeloxUDF.md
@@ -179,3 +179,48 @@ The output from spark-shell will be like
+------------------+----------------+
```
+# Pandas UDFs (a.k.a. Vectorized UDFs)
+
+## Introduction
+
+Pandas UDFs are user defined functions that are executed by Spark using Arrow
to transfer data and Pandas to work with the data, which allows vectorized
operations. A Pandas UDF is defined using the pandas_udf() as a decorator or to
wrap the function, and no additional configuration is required.
+A Pandas UDF behaves as a regular PySpark function API in general. For more
details, you can refer
[doc](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html).
+
+## Using Pandas UDFs in Gluten with Velox Backend
+
+Similar as in vanilla Spark, user needs to set up pyspark/arrow dependencies
properly first. You may can refer following steps:
+
+```
+pip3 install pyspark==$SPARK_VERSION cython
+pip3 install pandas pyarrow
+```
+
+Gluten provides a config to control enable `ColumnarArrowEvalPython` or not,
with `true` as defalt.
+
+```
+spark.gluten.sql.columnar.arrowUdf
+```
+
+Then take following `PySpark` code for example:
+
+```
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+import pyspark.sql.functions as F
+import os
+@pandas_udf('long')
+def pandas_plus_one(v):
+ return (v + 1)
+df =
spark.read.orc("path_to_file").select("quantity").withColumn("processed_quantity",
pandas_plus_one("quantity")).select("quantity")
+```
+
+The expected physical plan will be:
+
+```
+== Physical Plan ==
+VeloxColumnarToRowExec
++- ^(2) ProjectExecTransformer [pythonUDF0#45L AS processed_quantity#41L]
+ +- ^(2) InputIteratorTransformer[quantity#2L, pythonUDF0#45L]
+ +- ^(2) InputAdapter
+ +- ^(2) ColumnarArrowEvalPython [pandas_plus_one(quantity#2L)#40L],
[pythonUDF0#45L], 200
+ +- ^(1) NativeFileScan orc [quantity#2L] Batched: true,
DataFilters: [], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/***],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<quantity:bigint>
+```
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]