Repository: zeppelin Updated Branches: refs/heads/master 05af3bf4b -> c7c9aa1cc
[ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown ### What is this PR for? There is the chance to have a RemoteServerInterpreter hang forever during shutdown ### What type of PR is it? [Bug Fix] ### What is the Jira issue? [ZEPPELIN-2502] ### How should this be tested? Unit test provided for the fix. ### Questions: * Is there breaking changes for older versions? * Does this needs documentation? Author: andrea <[email protected]> Closes #2322 from andreaTP/processHang and squashes the following commits: e58483e [andrea] [ZEPPELIN-2502] RemoteInterpreterServer hang forever during shutdown Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/c7c9aa1c Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/c7c9aa1c Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/c7c9aa1c Branch: refs/heads/master Commit: c7c9aa1ccb14b8394cc01d91976f6ba61458f378 Parents: 05af3bf Author: andrea <[email protected]> Authored: Thu May 4 13:52:22 2017 +0100 Committer: Lee moon soo <[email protected]> Committed: Sat May 20 15:07:47 2017 -0400 ---------------------------------------------------------------------- .../remote/RemoteInterpreterEventClient.java | 7 ++- .../remote/RemoteInterpreterServer.java | 7 ++- .../remote/RemoteInterpreterServerTest.java | 63 ++++++++++++++++++++ 3 files changed, 73 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index bb6de31..2cdbf39 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -477,15 +477,18 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector { /** * Wait for eventQueue becomes empty */ - public void waitForEventQueueBecomesEmpty() { + public void waitForEventQueueBecomesEmpty(long atMost) { + long startTime = System.currentTimeMillis(); synchronized (eventQueue) { - while (!eventQueue.isEmpty()) { + while (!eventQueue.isEmpty() && (System.currentTimeMillis() - startTime) < atMost) { try { eventQueue.wait(100); } catch (InterruptedException e) { // ignore exception } } + if (!eventQueue.isEmpty()) + eventQueue.clear(); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 50881ca..719d2dd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -80,6 +80,8 @@ public class RemoteInterpreterServer private Map<String, Object> remoteWorksResponsePool; private ZeppelinRemoteWorksController remoteWorksController; + private final long DEFAULT_SHUTDOWN_TIMEOUT = 2000; + public RemoteInterpreterServer(int port) throws TTransportException { this.port = port; @@ -99,7 +101,7 @@ public class RemoteInterpreterServer @Override public void shutdown() throws TException { - eventClient.waitForEventQueueBecomesEmpty(); + eventClient.waitForEventQueueBecomesEmpty(DEFAULT_SHUTDOWN_TIMEOUT); if (interpreterGroup != null) { interpreterGroup.close(); } @@ -111,7 +113,8 @@ public class RemoteInterpreterServer // this case, need to force kill the process long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < 2000 && server.isServing()) { + while (System.currentTimeMillis() - startTime < DEFAULT_SHUTDOWN_TIMEOUT && + server.isServing()) { try { Thread.sleep(300); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/c7c9aa1c/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java index af6b4bd..a4b3a25 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -20,6 +20,9 @@ package org.apache.zeppelin.interpreter.remote; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.thrift.TException; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; @@ -72,5 +75,65 @@ public class RemoteInterpreterServerTest { assertEquals(false, running); } + class ShutdownRun implements Runnable { + private RemoteInterpreterServer serv = null; + public ShutdownRun(RemoteInterpreterServer serv) { + this.serv = serv; + } + @Override + public void run() { + try { + serv.shutdown(); + } catch (Exception ex) {}; + } + }; + + @Test + public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException { + RemoteInterpreterServer server = new RemoteInterpreterServer( + RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces()); + assertEquals(false, server.isRunning()); + + server.start(); + long startTime = System.currentTimeMillis(); + boolean running = false; + + while (System.currentTimeMillis() - startTime < 10 * 1000) { + if (server.isRunning()) { + running = true; + break; + } else { + Thread.sleep(200); + } + } + + assertEquals(true, running); + assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort())); + + //just send an event on the client queue + server.eventClient.onAppStatusUpdate("","","",""); + + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + + Runnable task = new ShutdownRun(server); + + executor.schedule(task, 0, TimeUnit.MILLISECONDS); + + while (System.currentTimeMillis() - startTime < 10 * 1000) { + if (server.isRunning()) { + Thread.sleep(200); + } else { + running = false; + break; + } + } + + executor.shutdown(); + + //cleanup environment for next tests + server.shutdown(); + + assertEquals(false, running); + } }
