This is an automated email from the ASF dual-hosted git repository.
lmccay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push:
new 96b1eb61 [LIVY-995][REPL] JsonParseException is thrown when closing
Livy session when using python profile (#439)
96b1eb61 is described below
commit 96b1eb617b18302ae842d0ebed298e5ec61429f5
Author: jianzhenwu <[email protected]>
AuthorDate: Fri Jan 26 07:56:55 2024 +0800
[LIVY-995][REPL] JsonParseException is thrown when closing Livy session
when using python profile (#439)
* [LIVY-995][REPL] JsonParseException is thrown when closing Livy session
when using python profile
---
.../org/apache/livy/repl/PythonInterpreter.scala | 16 ++++++++++---
.../apache/livy/repl/PythonInterpreterSpec.scala | 26 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 3 deletions(-)
diff --git a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
index 36c09deb..58b7147a 100644
--- a/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/PythonInterpreter.scala
@@ -269,11 +269,21 @@ private class PythonInterpreter(
}
override protected def sendShutdownRequest(): Unit = {
- sendRequest(Map(
+ stdin.println(write(Map(
"msg_type" -> "shutdown_request",
"content" -> ()
- )).foreach { case rep =>
- warn(f"process failed to shut down while returning $rep")
+ )))
+ stdin.flush()
+
+ // Pyspark prints profile info to stdout when enabling
spark.python.profile. see SPARK-37443
+ var lines = Seq[String]()
+ var line = stdout.readLine()
+ while(line != null) {
+ lines :+= line
+ line = stdout.readLine()
+ }
+ if (lines.nonEmpty) {
+ warn(f"python process shut down while returning ${lines.mkString("\n")}")
}
}
diff --git
a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
index 4a78c61f..52a42918 100644
--- a/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/PythonInterpreterSpec.scala
@@ -249,6 +249,32 @@ abstract class PythonBaseInterpreterSpec extends
BaseInterpreterSpec {
)
))
}
+
+ it should "work when interpreter exit with json stdout" in {
+ noException should be thrownBy {
+ withInterpreter { intp =>
+ val response = intp.execute(
+ """import atexit, sys
+ |atexit.register(sys.stdout.write, '{}')
+ |""".stripMargin
+ )
+ response shouldBe a[Interpreter.ExecuteSuccess]
+ }
+ }
+ }
+
+ it should "work when interpreter exit with non-json stdout" in {
+ noException should be thrownBy {
+ withInterpreter { intp =>
+ val response = intp.execute(
+ """import atexit, sys
+ |atexit.register(sys.stdout.write, 'line1\nline2')
+ |""".stripMargin
+ )
+ response shouldBe a[Interpreter.ExecuteSuccess]
+ }
+ }
+ }
}
class Python2InterpreterSpec extends PythonBaseInterpreterSpec {