[ https://issues.apache.org/jira/browse/APEXCORE-734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088247#comment-16088247 ]
ASF GitHub Bot commented on APEXCORE-734: ----------------------------------------- PramodSSImmaneni commented on a change in pull request #532: APEXCORE-734 StramLocalCluster may not terminate properly URL: https://github.com/apache/apex-core/pull/532#discussion_r127563982 ########## File path: engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ########## @@ -463,103 +463,123 @@ public void run() @SuppressWarnings({"SleepWhileInLoop", "ResultOfObjectAllocationIgnored"}) public void run(long runMillis) { - if (!perContainerBufferServer) { - StreamingContainer.eventloop.start(); - bufferServer = new Server(StreamingContainer.eventloop, 0, 1024 * 1024, 8); - try { - bufferServer.setSpoolStorage(new DiskStorage()); - } catch (IOException e) { - throw new RuntimeException(e); + Thread eventLoopThread = null; + List<Thread> containerThreads = new LinkedList<>(); + try { + if (!perContainerBufferServer) { + eventLoopThread = StreamingContainer.eventloop.start(); + bufferServer = new Server(StreamingContainer.eventloop, 0, 1024 * 1024, 8); + try { + bufferServer.setSpoolStorage(new DiskStorage()); + } catch (IOException e) { + throw new RuntimeException(e); + } + bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run().getPort()); + LOG.info("Buffer server started: {}", bufferServerAddress); } - bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run().getPort()); - LOG.info("Buffer server started: {}", bufferServerAddress); - } - long endMillis = System.currentTimeMillis() + runMillis; - List<Thread> containerThreads = new LinkedList<>(); + long endMillis = System.currentTimeMillis() + runMillis; - while (!appDone) { + while (!appDone) { - for (String containerIdStr: dnmgr.containerStopRequests.values()) { - // teardown child thread - StreamingContainer c = childContainers.get(containerIdStr); - if (c != null) { - ContainerHeartbeatResponse r = new ContainerHeartbeatResponse(); - r.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT; - c.processHeartbeatResponse(r); + for (String containerIdStr : dnmgr.containerStopRequests.values()) { + // teardown child thread + StreamingContainer c = childContainers.get(containerIdStr); + if (c != null) { + ContainerHeartbeatResponse r = new ContainerHeartbeatResponse(); + r.shutdown = StreamingContainerUmbilicalProtocol.ShutdownType.ABORT; + c.processHeartbeatResponse(r); + } + dnmgr.containerStopRequests.remove(containerIdStr); + LOG.info("Container {} restart.", containerIdStr); + dnmgr.scheduleContainerRestart(containerIdStr); + //dnmgr.removeContainerAgent(containerIdStr); } - dnmgr.containerStopRequests.remove(containerIdStr); - LOG.info("Container {} restart.", containerIdStr); - dnmgr.scheduleContainerRestart(containerIdStr); - //dnmgr.removeContainerAgent(containerIdStr); - } - // start containers - while (!dnmgr.containerStartRequests.isEmpty()) { - ContainerStartRequest cdr = dnmgr.containerStartRequests.poll(); - if (cdr != null) { - new LocalStreamingContainerLauncher(cdr, containerThreads); + // start containers + while (!dnmgr.containerStartRequests.isEmpty()) { + ContainerStartRequest cdr = dnmgr.containerStartRequests.poll(); + if (cdr != null) { + new LocalStreamingContainerLauncher(cdr, containerThreads); + } } - } - if (heartbeatMonitoringEnabled) { - // monitor child containers - dnmgr.monitorHeartbeat(false); - } - - if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) { - appDone = true; - } + if (heartbeatMonitoringEnabled) { + // monitor child containers + dnmgr.monitorHeartbeat(false); + } - if (runMillis > 0 && System.currentTimeMillis() > endMillis) { - appDone = true; - } + if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) { + appDone = true; + } - try { - if (exitCondition != null && exitCondition.call()) { + if (runMillis > 0 && System.currentTimeMillis() > endMillis) { appDone = true; } - } catch (Exception ex) { - break; - } - if (Thread.interrupted()) { - break; + try { + if (exitCondition != null && exitCondition.call()) { + appDone = true; + } + } catch (Exception ex) { + break; + } + + if (Thread.interrupted()) { + break; + } + + if (!appDone) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.debug("Sleep interrupted", e); + break; + } + } + } + } finally { + for (LocalStreamingContainer lsc : childContainers.values()) { + injectShutdown.put(lsc.getContainerId(), lsc); + lsc.triggerHeartbeat(); } - if (!appDone) { + for (Thread thread : containerThreads) { try { - Thread.sleep(1000); + thread.join(1000); } catch (InterruptedException e) { LOG.debug("Sleep interrupted", e); - break; + } + if (thread.isAlive()) { + LOG.warn("Container thread {} didn't finish", thread.getName()); } } - } - - for (LocalStreamingContainer lsc: childContainers.values()) { - injectShutdown.put(lsc.getContainerId(), lsc); - lsc.triggerHeartbeat(); - } - for (Thread thread: containerThreads) { try { - thread.join(1000); - } catch (InterruptedException e) { - LOG.debug("Sleep interrupted", e); - } - if (thread.isAlive()) { - LOG.warn("Container thread {} didn't finish", thread.getName()); + dnmgr.teardown(); + } catch (RuntimeException e) { + LOG.warn("Exception during StreamingContainerManager teardown", e); } - } - dnmgr.teardown(); + if (bufferServerAddress != null) { + try { + bufferServer.stop(); + } catch (RuntimeException e) { + LOG.warn("Exception during BufferServer stop", e); + } + } - LOG.info("Application finished."); - if (!perContainerBufferServer) { - bufferServer.stop(); - StreamingContainer.eventloop.stop(); + if (eventLoopThread != null) { + try { + StreamingContainer.eventloop.stop(); + eventLoopThread.join(1000); + } catch (InterruptedException ie) { + LOG.debug("Sleep interrupted", ie); Review comment: Should this be debug or warn? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StramLocalCluster may not terminate properly > -------------------------------------------- > > Key: APEXCORE-734 > URL: https://issues.apache.org/jira/browse/APEXCORE-734 > Project: Apache Apex Core > Issue Type: Improvement > Reporter: Vlad Rozov > Assignee: Vlad Rozov > > When StramLocalCluster is run asynchronously it may be shutdown during > StramLocalCluster initialization leading to termination without performing > necessary termination sequence. Runtime exception during the run may also > lead to improper termination sequence. For example: > {noformat} > Exception in thread "master" java.lang.RuntimeException: > java.lang.InterruptedException > at com.datatorrent.bufferserver.server.Server.run(Server.java:154) > at > com.datatorrent.stram.StramLocalCluster.run(StramLocalCluster.java:474) > at > com.datatorrent.stram.StramLocalCluster.run(StramLocalCluster.java:459) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at com.datatorrent.bufferserver.server.Server.run(Server.java:152) > ... 3 more > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)