This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new bbd9373 [SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow
enabled to show metrics in Query UI
bbd9373 is described below
commit bbd93739cc47d0bcf11dd59fdd5fd8b3a4c434f8
Author: Takuya UESHIN <[email protected]>
AuthorDate: Fri Jun 5 12:53:58 2020 +0900
[SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow enabled to show
metrics in Query UI
### What changes were proposed in this pull request?
In `Dataset.collectAsArrowToR` and `Dataset.collectAsArrowToPython`, since
the code block for `serveToStream` is run in the separate thread, `withAction`
finishes as soon as it starts the thread. As a result, it doesn't collect the
metrics of the actual action and Query UI shows the plan graph without metrics.
We should call `serveToStream` first, then `withAction` in it.
### Why are the changes needed?
When calling toPandas, usually Query UI shows each plan node's metric and
corresponding Stage ID and Task ID:
```py
>>> df = spark.createDataFrame([(1, 10, 'abc'), (2, 20, 'def')],
schema=['x', 'y', 'z'])
>>> df.toPandas()
x y z
0 1 10 abc
1 2 20 def
```

but if Arrow execution is enabled, it shows only plan nodes and the
duration is not correct:
```py
>>> spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
>>> df.toPandas()
x y z
0 1 10 abc
1 2 20 def
```

### Does this PR introduce _any_ user-facing change?
Yes, the Query UI will show the plan with the correct metrics.
### How was this patch tested?
I checked it manually in my local.

Closes #28730 from ueshin/issues/SPARK-31903/to_pandas_with_arrow_query_ui.
Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 632b5bce23c94d25712b43be83252b34ebfd3e72)
Signed-off-by: HyukjinKwon <[email protected]>
---
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c17535a..9d4312b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3481,8 +3481,8 @@ class Dataset[T] private[sql](
private[sql] def collectAsArrowToR(): Array[Any] = {
val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
- withAction("collectAsArrowToR", queryExecution) { plan =>
- RRDD.serveToStream("serve-Arrow") { outputStream =>
+ RRDD.serveToStream("serve-Arrow") { outputStream =>
+ withAction("collectAsArrowToR", queryExecution) { plan =>
val buffer = new ByteArrayOutputStream()
val out = new DataOutputStream(outputStream)
val batchWriter = new ArrowBatchStreamWriter(schema, buffer,
timeZoneId)
@@ -3535,8 +3535,8 @@ class Dataset[T] private[sql](
private[sql] def collectAsArrowToPython: Array[Any] = {
val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
- withAction("collectAsArrowToPython", queryExecution) { plan =>
- PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+ PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+ withAction("collectAsArrowToPython", queryExecution) { plan =>
val out = new DataOutputStream(outputStream)
val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]