This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 84fabc27d68 [SPARK-41005][CONNECT][DOC][FOLLOW-UP] Document the reason 
of sending batch in main thread
84fabc27d68 is described below

commit 84fabc27d688601feabb42abfc7356cd743b3c38
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Nov 15 10:08:35 2022 +0900

    [SPARK-41005][CONNECT][DOC][FOLLOW-UP] Document the reason of sending batch 
in main thread
    
    ### What changes were proposed in this pull request?
    Document the reason of sending batch in main thread
    
    ### Why are the changes needed?
    as per https://github.com/apache/spark/pull/38613#discussion_r1021041413
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    no, doc-only
    
    Closes #38654 from zhengruifeng/connect_doc_collect.
    
    Lead-authored-by: Ruifeng Zheng <[email protected]>
    Co-authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../spark/sql/connect/service/SparkConnectStreamHandler.scala       | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
index 55e091bd8d0..ec2db3efa96 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
@@ -168,8 +168,12 @@ class SparkConnectStreamHandler(responseObserver: 
StreamObserver[Response]) exte
           resultHandler = resultHandler,
           resultFunc = () => ())
 
-        // The man thread will wait until 0-th partition is available,
+        // The main thread will wait until 0-th partition is available,
         // then send it to client and wait for the next partition.
+        // Different from the implementation of 
[[Dataset#collectAsArrowToPython]], it sends
+        // the arrow batches in main thread to avoid DAGScheduler thread been 
blocked for
+        // tasks not related to scheduling. This is particularly important if 
there are
+        // multiple users or clients running code at the same time.
         var currentPartitionId = 0
         while (currentPartitionId < numPartitions) {
           val partition = signal.synchronized {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to