Repository: storm Updated Branches: refs/heads/1.x-branch e178990cb -> e462979b3
STORM-2724 Shutdown ExecutorService in WaterMarkEventGenerator in shutdown phase * also names unnamed executor services in windowing package * address review comment from @srdo Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a648db54 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a648db54 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a648db54 Branch: refs/heads/1.x-branch Commit: a648db54b9848aac651dbff88ebe721369a7f477 Parents: fd6185f Author: Jungtaek Lim <[email protected]> Authored: Thu Sep 7 23:36:10 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Sat Sep 9 08:05:43 2017 +0900 ---------------------------------------------------------------------- .../storm/topology/WindowedBoltExecutor.java | 1 + .../storm/windowing/TimeTriggerPolicy.java | 8 ++++++- .../windowing/WaterMarkEventGenerator.java | 25 +++++++++++++++++++- .../windowing/WaterMarkEventGeneratorTest.java | 6 +++++ 4 files changed, 38 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java index 965d11c..26c69a2 100644 --- a/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java +++ b/storm-core/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java @@ -299,6 +299,7 @@ public class WindowedBoltExecutor implements IRichBolt { @Override public void cleanup() { + waterMarkEventGenerator.shutdown(); windowManager.shutdown(); bolt.cleanup(); } http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java index f6e0197..882b6be 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/TimeTriggerPolicy.java @@ -17,6 +17,7 @@ */ package org.apache.storm.windowing; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.storm.topology.FailedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** @@ -46,7 +48,11 @@ public class TimeTriggerPolicy<T> implements TriggerPolicy<T> { public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy<T> evictionPolicy) { this.duration = millis; this.handler = handler; - this.executor = Executors.newSingleThreadScheduledExecutor(); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("time-trigger-policy-%d") + .setDaemon(true) + .build(); + this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory); this.evictionPolicy = evictionPolicy; } http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java index d4f431f..194c359 100644 --- a/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java +++ b/storm-core/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java @@ -25,8 +25,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.topology.FailedException; import org.slf4j.Logger; @@ -60,7 +63,13 @@ public class WaterMarkEventGenerator<T> implements Runnable { int eventTsLagMs, Set<GlobalStreamId> inputStreams) { this.windowManager = windowManager; streamToTs = new ConcurrentHashMap<>(); - executorService = Executors.newSingleThreadScheduledExecutor(); + + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("watermark-event-generator-%d") + .setDaemon(true) + .build(); + executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + this.interval = intervalMs; this.eventTsLag = eventTsLagMs; this.inputStreams = inputStreams; @@ -126,4 +135,18 @@ public class WaterMarkEventGenerator<T> implements Runnable { public void start() { this.executorFuture = executorService.scheduleAtFixedRate(this, interval, interval, TimeUnit.MILLISECONDS); } + + public void shutdown() { + LOG.debug("Shutting down WaterMarkEventGenerator"); + executorService.shutdown(); + + try { + if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException ie) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/a648db54/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java b/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java index 9f4bcf8..e735619 100644 --- a/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java +++ b/storm-core/test/jvm/org/apache/storm/windowing/WaterMarkEventGeneratorTest.java @@ -18,6 +18,7 @@ package org.apache.storm.windowing; import org.apache.storm.generated.GlobalStreamId; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -56,6 +57,11 @@ public class WaterMarkEventGeneratorTest { waterMarkEventGenerator.start(); } + @After + public void tearDown() { + waterMarkEventGenerator.shutdown(); + } + @Test public void testTrackSingleStream() throws Exception { waterMarkEventGenerator.track(streamId("s1"), 100);
