This is an automated email from the ASF dual-hosted git repository.

lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 96b1eb61 [LIVY-995][REPL] JsonParseException is thrown when closing 
Livy session when using python profile (#439)
96b1eb61 is described below

commit 96b1eb617b18302ae842d0ebed298e5ec61429f5
Author: jianzhenwu <[email protected]>
AuthorDate: Fri Jan 26 07:56:55 2024 +0800

    [LIVY-995][REPL] JsonParseException is thrown when closing Livy session 
when using python profile (#439)
    
    * [LIVY-995][REPL] JsonParseException is thrown when closing Livy session 
when using python profile
---
 .../org/apache/livy/repl/PythonInterpreter.scala   | 16 ++++++++++---
 .../apache/livy/repl/PythonInterpreterSpec.scala   | 26 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala 
b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
index 36c09deb..58b7147a 100644
--- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
@@ -269,11 +269,21 @@ private class PythonInterpreter(
   }
 
   override protected def sendShutdownRequest(): Unit = {
-    sendRequest(Map(
+    stdin.println(write(Map(
       "msg_type" -> "shutdown_request",
       "content" -> ()
-    )).foreach { case rep =>
-      warn(f"process failed to shut down while returning $rep")
+    )))
+    stdin.flush()
+
+    // Pyspark prints profile info to stdout when enabling 
spark.python.profile. see SPARK-37443
+    var lines = Seq[String]()
+    var line = stdout.readLine()
+    while(line != null) {
+      lines :+= line
+      line = stdout.readLine()
+    }
+    if (lines.nonEmpty) {
+      warn(f"python process shut down while returning ${lines.mkString("\n")}")
     }
   }
 
diff --git 
a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala 
b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
index 4a78c61f..52a42918 100644
--- a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
@@ -249,6 +249,32 @@ abstract class PythonBaseInterpreterSpec extends 
BaseInterpreterSpec {
       )
     ))
   }
+
+  it should "work when interpreter exit with json stdout" in {
+    noException should be thrownBy {
+      withInterpreter { intp =>
+        val response = intp.execute(
+          """import atexit, sys
+            |atexit.register(sys.stdout.write, '{}')
+            |""".stripMargin
+        )
+        response shouldBe a[Interpreter.ExecuteSuccess]
+      }
+    }
+  }
+
+  it should "work when interpreter exit with non-json stdout" in {
+    noException should be thrownBy {
+      withInterpreter { intp =>
+        val response = intp.execute(
+          """import atexit, sys
+            |atexit.register(sys.stdout.write, 'line1\nline2')
+            |""".stripMargin
+        )
+        response shouldBe a[Interpreter.ExecuteSuccess]
+      }
+    }
+  }
 }
 
 class Python2InterpreterSpec extends PythonBaseInterpreterSpec {

Reply via email to