Repository: apex-core Updated Branches: refs/heads/master 989536773 -> 0670eb3e0
Fixing cleanup during shutdown to avoid intermittent errors Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0670eb3e Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0670eb3e Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0670eb3e Branch: refs/heads/master Commit: 0670eb3e0be01097123263406692238e974f142a Parents: 9895367 Author: Pramod Immaneni <[email protected]> Authored: Fri Feb 26 16:01:20 2016 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Tue Sep 13 10:57:56 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/stram/StramLocalCluster.java | 23 ++++++++++++++++---- .../com/datatorrent/stram/PartitioningTest.java | 2 +- 2 files changed, 20 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/0670eb3e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 23737d0..dd28304 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -236,13 +237,15 @@ public class StramLocalCluster implements Runnable, Controller /** * Starts the child "container" as thread. */ - private class LocalStramChildLauncher implements Runnable + private class LocalStreamingContainerLauncher implements Runnable { final String containerId; final LocalStreamingContainer child; + Thread launchThread; + @SuppressWarnings("CallToThreadStartDuringObjectConstruction") - private LocalStramChildLauncher(ContainerStartRequest cdr) + private LocalStreamingContainerLauncher(ContainerStartRequest cdr) { this.containerId = "container-" + containerSeq++; WindowGenerator wingen = null; @@ -253,7 +256,7 @@ public class StramLocalCluster implements Runnable, Controller ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null); StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress); if (sca != null) { - Thread launchThread = new Thread(this, containerId); + launchThread = new Thread(this, containerId); launchThread.start(); childContainers.put(containerId, child); LOG.info("Started container {}", containerId); @@ -437,6 +440,7 @@ public class StramLocalCluster implements Runnable, Controller public void run(long runMillis) { long endMillis = System.currentTimeMillis() + runMillis; + List<Thread> containerThreads = new LinkedList<Thread>(); while (!appDone) { @@ -458,7 +462,10 @@ public class StramLocalCluster implements Runnable, Controller while (!dnmgr.containerStartRequests.isEmpty()) { ContainerStartRequest cdr = dnmgr.containerStartRequests.poll(); if (cdr != null) { - new LocalStramChildLauncher(cdr); + LocalStreamingContainerLauncher launcher = new LocalStreamingContainerLauncher(cdr); + if (launcher.launchThread != null) { + containerThreads.add(launcher.launchThread); + } } } @@ -502,6 +509,14 @@ public class StramLocalCluster implements Runnable, Controller lsc.triggerHeartbeat(); } + for (Thread thread: containerThreads) { + try { + thread.join(1000); + } catch (InterruptedException e) { + LOG.warn("Container thread didn't finish {}", thread.getName()); + } + } + dnmgr.teardown(); LOG.info("Application finished."); http://git-wip-us.apache.org/repos/asf/apex-core/blob/0670eb3e/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java index 4f8becd..9eef586 100644 --- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java +++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java @@ -260,7 +260,7 @@ public class PartitioningTest }; StramTestSupport.awaitCompletion(c, 10000); - Assert.assertTrue("Number partitions " + ow, c.isComplete()); + Assert.assertTrue("Number partitions match " + ow, c.isComplete()); return lc.getPlanOperators(ow); }
