This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cf64008fce7 [SPARK-44433][PYTHON][CONNECT][SS][FOLLOWUP] Set back 
USE_DAEMON after creating streaming python processes
cf64008fce7 is described below

commit cf64008fce77b38d1237874b04f5ac124b01b3a8
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Fri Aug 4 17:41:27 2023 -0700

    [SPARK-44433][PYTHON][CONNECT][SS][FOLLOWUP] Set back USE_DAEMON after 
creating streaming python processes
    
    ### What changes were proposed in this pull request?
    
    Followup of this comment: 
https://github.com/apache/spark/pull/42283#discussion_r1283804782
    Change back the spark conf after creating streaming python process.
    
    ### Why are the changes needed?
    
    Bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Config only change
    
    Closes #42341 from WweiL/SPARK-44433-followup-USEDAEMON.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>
---
 .../spark/api/python/StreamingPythonRunner.scala       | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index f14289f984a..a079743c847 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -66,14 +66,19 @@ private[spark] class StreamingPythonRunner(
 
     envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
     envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
-    conf.set(PYTHON_USE_DAEMON, false)
     envVars.put("SPARK_CONNECT_LOCAL_URL", connectUrl)
 
-    val (worker, _) = env.createPythonWorker(
-      pythonExec, workerModule, envVars.asScala.toMap)
-    pythonWorker = Some(worker)
+    val prevConf = conf.get(PYTHON_USE_DAEMON)
+    conf.set(PYTHON_USE_DAEMON, false)
+    try {
+      val (worker, _) = env.createPythonWorker(
+        pythonExec, workerModule, envVars.asScala.toMap)
+      pythonWorker = Some(worker)
+    } finally {
+      conf.set(PYTHON_USE_DAEMON, prevConf)
+    }
 
-    val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
+    val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, 
bufferSize)
     val dataOut = new DataOutputStream(stream)
 
     // TODO(SPARK-44461): verify python version
@@ -87,7 +92,8 @@ private[spark] class StreamingPythonRunner(
     dataOut.write(command.toArray)
     dataOut.flush()
 
-    val dataIn = new DataInputStream(new 
BufferedInputStream(worker.getInputStream, bufferSize))
+    val dataIn = new DataInputStream(
+      new BufferedInputStream(pythonWorker.get.getInputStream, bufferSize))
 
     val resFromPython = dataIn.readInt()
     logInfo(s"Runner initialization returned $resFromPython")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to