Repository: apex-core Updated Branches: refs/heads/master a6dd73b96 -> 58930cc57
APEXCORE-645 StramLocalCluster does not wait for master thread termination Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/e233a26a Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/e233a26a Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/e233a26a Branch: refs/heads/master Commit: e233a26aa3ff08c564015ac700df4e3b15962226 Parents: 74f732a Author: Vlad Rozov <[email protected]> Authored: Mon Feb 13 08:58:18 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Tue Feb 14 13:49:10 2017 -0800 ---------------------------------------------------------------------- .../datatorrent/stram/StramLocalCluster.java | 29 ++++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/e233a26a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 2ffbabd..35b3d1c 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -81,6 +81,7 @@ public class StramLocalCluster implements Runnable, Controller private final Map<String, StreamingContainer> injectShutdown = new ConcurrentHashMap<>(); private boolean heartbeatMonitoringEnabled = true; private Callable<Boolean> exitCondition; + private Thread master; public interface MockComponentFactory { @@ -404,13 +405,32 @@ public class StramLocalCluster implements Runnable, Controller @Override public void runAsync() { - new Thread(this, "master").start(); + master = new Thread(this, "master"); + master.start(); } @Override public void shutdown() { appDone = true; + awaitTermination(0); + } + + private void awaitTermination(long millis) + { + if (master != null) { + try { + master.interrupt(); + master.join(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + if (master.isAlive()) { + LOG.warn("{} {} did not terminate.", this.getClass().getSimpleName(), master.getName()); + } + master = null; + } + } } public boolean isFinished() @@ -512,7 +532,7 @@ public class StramLocalCluster implements Runnable, Controller try { Thread.sleep(1000); } catch (InterruptedException e) { - LOG.info("Sleep interrupted " + e.getMessage()); + LOG.debug("Sleep interrupted", e); break; } } @@ -527,7 +547,10 @@ public class StramLocalCluster implements Runnable, Controller try { thread.join(1000); } catch (InterruptedException e) { - LOG.warn("Container thread didn't finish {}", thread.getName()); + LOG.debug("Sleep interrupted", e); + } + if (thread.isAlive()) { + LOG.warn("Container thread {} didn't finish", thread.getName()); } }
