Repository: apex-core Updated Branches: refs/heads/master 0b518dea6 -> 388bb3877
APEXCORE-527 - Minor changes in LocalStramChildLauncher to help with unit test failures Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/1388bca7 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/1388bca7 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/1388bca7 Branch: refs/heads/master Commit: 1388bca782f88216c3888b7d6c260b89006fdcea Parents: 0670eb3 Author: Vlad Rozov <[email protected]> Authored: Mon Sep 12 19:00:09 2016 -0700 Committer: Vlad Rozov <[email protected]> Committed: Tue Sep 13 13:17:26 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/stram/StramLocalCluster.java | 25 ++++++++------------ 1 file changed, 10 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/1388bca7/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index dd28304..48ed070 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -172,7 +172,7 @@ public class StramLocalCluster implements Runnable, Controller public void run(StreamingContainerContext ctx) throws Exception { - LOG.debug("Got context: " + ctx); + LOG.debug("container {} context {}", getContainerId(), ctx); setup(ctx); if (bufferServerAddress != null && !bufferServerAddress.getAddress().isLoopbackAddress()) { bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServerAddress.getPort()); @@ -242,10 +242,8 @@ public class StramLocalCluster implements Runnable, Controller final String containerId; final LocalStreamingContainer child; - Thread launchThread; - @SuppressWarnings("CallToThreadStartDuringObjectConstruction") - private LocalStreamingContainerLauncher(ContainerStartRequest cdr) + private LocalStreamingContainerLauncher(ContainerStartRequest cdr, List<Thread> containerThreads) { this.containerId = "container-" + containerSeq++; WindowGenerator wingen = null; @@ -256,10 +254,10 @@ public class StramLocalCluster implements Runnable, Controller ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null); StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress); if (sca != null) { - launchThread = new Thread(this, containerId); - launchThread.start(); childContainers.put(containerId, child); - LOG.info("Started container {}", containerId); + Thread launchThread = new Thread(this, containerId); + containerThreads.add(launchThread); + launchThread.start(); } } @@ -268,10 +266,10 @@ public class StramLocalCluster implements Runnable, Controller { try { StreamingContainerContext ctx = umbilical.getInitContext(containerId); + LOG.info("Started container {}", containerId); child.run(ctx); - } catch (Exception e) { - LOG.error("Container {} failed", containerId, e); - throw new RuntimeException(e); + } catch (Exception | Error e) { + LOG.error("Fatal {} in container {}", e instanceof Error ? "error" : "exception", containerId, e); } finally { childContainers.remove(containerId); LOG.info("Container {} terminating.", containerId); @@ -440,7 +438,7 @@ public class StramLocalCluster implements Runnable, Controller public void run(long runMillis) { long endMillis = System.currentTimeMillis() + runMillis; - List<Thread> containerThreads = new LinkedList<Thread>(); + List<Thread> containerThreads = new LinkedList<>(); while (!appDone) { @@ -462,10 +460,7 @@ public class StramLocalCluster implements Runnable, Controller while (!dnmgr.containerStartRequests.isEmpty()) { ContainerStartRequest cdr = dnmgr.containerStartRequests.poll(); if (cdr != null) { - LocalStreamingContainerLauncher launcher = new LocalStreamingContainerLauncher(cdr); - if (launcher.launchThread != null) { - containerThreads.add(launcher.launchThread); - } + new LocalStreamingContainerLauncher(cdr, containerThreads); } }
