Repository: spark Updated Branches: refs/heads/branch-2.3 38c0bd7db -> 0bfbcaf66
[SPARK-23785][LAUNCHER] LauncherBackend doesn't check state of connection before setting state ## What changes were proposed in this pull request? Changed `LauncherBackend` `set` method so that it checks if the connection is open or not before writing to it (uses `isConnected`). ## How was this patch tested? None Author: Sahil Takiar <[email protected]> Closes #20893 from sahilTakiar/master. (cherry picked from commit 491ec114fd3886ebd9fa29a482e3d112fb5a088c) Signed-off-by: Marcelo Vanzin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0bfbcaf6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0bfbcaf6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0bfbcaf6 Branch: refs/heads/branch-2.3 Commit: 0bfbcaf6696570b74923047266b00ba4dc2ba97c Parents: 38c0bd7 Author: Sahil Takiar <[email protected]> Authored: Thu Mar 29 10:23:23 2018 -0700 Committer: Marcelo Vanzin <[email protected]> Committed: Thu Mar 29 10:23:35 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/launcher/LauncherBackend.scala | 6 +++--- .../spark/launcher/LauncherServerSuite.java | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0bfbcaf6/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala index aaae33c..1b049b7 100644 --- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala +++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala @@ -67,13 +67,13 @@ private[spark] abstract class LauncherBackend { } def setAppId(appId: String): Unit = { - if (connection != null) { + if (connection != null && isConnected) { connection.send(new SetAppId(appId)) } } def setState(state: SparkAppHandle.State): Unit = { - if (connection != null && lastState != state) { + if (connection != null && isConnected && lastState != state) { connection.send(new SetState(state)) lastState = state } @@ -114,10 +114,10 @@ private[spark] abstract class LauncherBackend { override def close(): Unit = { try { + _isConnected = false super.close() } finally { onDisconnected() - _isConnected = false } } http://git-wip-us.apache.org/repos/asf/spark/blob/0bfbcaf6/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java ---------------------------------------------------------------------- diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java index d16337a..5413d3a 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java @@ -185,6 +185,26 @@ public class LauncherServerSuite extends BaseSuite { } } + @Test + public void testAppHandleDisconnect() throws Exception { + LauncherServer server = LauncherServer.getOrCreateServer(); + ChildProcAppHandle handle = new ChildProcAppHandle(server); + String secret = server.registerHandle(handle); + + TestClient client = null; + try { + Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort()); + client = new TestClient(s); + client.send(new Hello(secret, "1.4.0")); + handle.disconnect(); + waitForError(client, secret); + } finally { + handle.kill(); + close(client); + client.clientThread.join(); + } + } + private void close(Closeable c) { if (c != null) { try { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
