This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new bbd9373  [SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow 
enabled to show metrics in Query UI
bbd9373 is described below

commit bbd93739cc47d0bcf11dd59fdd5fd8b3a4c434f8
Author: Takuya UESHIN <ues...@databricks.com>
AuthorDate: Fri Jun 5 12:53:58 2020 +0900

    [SPARK-31903][SQL][PYSPARK][R] Fix toPandas with Arrow enabled to show 
metrics in Query UI
    
    ### What changes were proposed in this pull request?
    
    In `Dataset.collectAsArrowToR` and `Dataset.collectAsArrowToPython`, since 
the code block for `serveToStream` is run in the separate thread, `withAction` 
finishes as soon as it starts the thread. As a result, it doesn't collect the 
metrics of the actual action and Query UI shows the plan graph without metrics.
    
    We should call `serveToStream` first, then `withAction` in it.
    
    ### Why are the changes needed?
    
    When calling toPandas, usually Query UI shows each plan node's metric and 
corresponding Stage ID and Task ID:
    
    ```py
    >>> df = spark.createDataFrame([(1, 10, 'abc'), (2, 20, 'def')], 
schema=['x', 'y', 'z'])
    >>> df.toPandas()
       x   y    z
    0  1  10  abc
    1  2  20  def
    ```
    
    ![Screen Shot 2020-06-03 at 4 47 07 
PM](https://user-images.githubusercontent.com/506656/83815735-bec22380-a675-11ea-8ecc-bf2954731f35.png)
    
    but if Arrow execution is enabled, it shows only plan nodes and the 
duration is not correct:
    
    ```py
    >>> spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
    >>> df.toPandas()
       x   y    z
    0  1  10  abc
    1  2  20  def
    ```
    
    ![Screen Shot 2020-06-03 at 4 47 27 
PM](https://user-images.githubusercontent.com/506656/83815804-de594c00-a675-11ea-933a-d0ffc0f534dd.png)
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the Query UI will show the plan with the correct metrics.
    
    ### How was this patch tested?
    
    I checked it manually in my local.
    
    ![Screen Shot 2020-06-04 at 3 19 41 
PM](https://user-images.githubusercontent.com/506656/83816265-d77f0900-a676-11ea-84b8-2a8d80428bc6.png)
    
    Closes #28730 from ueshin/issues/SPARK-31903/to_pandas_with_arrow_query_ui.
    
    Authored-by: Takuya UESHIN <ues...@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit 632b5bce23c94d25712b43be83252b34ebfd3e72)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c17535a..9d4312b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3481,8 +3481,8 @@ class Dataset[T] private[sql](
   private[sql] def collectAsArrowToR(): Array[Any] = {
     val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
-    withAction("collectAsArrowToR", queryExecution) { plan =>
-      RRDD.serveToStream("serve-Arrow") { outputStream =>
+    RRDD.serveToStream("serve-Arrow") { outputStream =>
+      withAction("collectAsArrowToR", queryExecution) { plan =>
         val buffer = new ByteArrayOutputStream()
         val out = new DataOutputStream(outputStream)
         val batchWriter = new ArrowBatchStreamWriter(schema, buffer, 
timeZoneId)
@@ -3535,8 +3535,8 @@ class Dataset[T] private[sql](
   private[sql] def collectAsArrowToPython: Array[Any] = {
     val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
 
-    withAction("collectAsArrowToPython", queryExecution) { plan =>
-      PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+    PythonRDD.serveToStream("serve-Arrow") { outputStream =>
+      withAction("collectAsArrowToPython", queryExecution) { plan =>
         val out = new DataOutputStream(outputStream)
         val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to