STORM-2153: Replace timer with ScheduledThreadPoolExecutor
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e13f9034 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e13f9034 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e13f9034 Branch: refs/heads/1.x-branch Commit: e13f903452585ab84e08d5a7ac3a79c43141f232 Parents: 868de5b Author: P. Taylor Goetz <[email protected]> Authored: Wed Jan 3 14:48:42 2018 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Wed Jan 3 14:48:42 2018 -0500 ---------------------------------------------------------------------- .../jvm/org/apache/storm/utils/DisruptorQueue.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e13f9034/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index d7cf401..6ea3683 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -48,6 +48,7 @@ import java.util.TimerTask; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -65,7 +66,7 @@ public class DisruptorQueue implements IStatefulObject { private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); - private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); + private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR = new ScheduledThreadPoolExecutor(1); private static int getNumFlusherPoolThreads() { int numThreads = 100; @@ -437,17 +438,15 @@ public class DisruptorQueue implements IStatefulObject { _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); - try { - METRICS_TIMER.schedule(new TimerTask() { + if(!METRICS_REPORTER_EXECUTOR.isShutdown()) { + METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() { @Override public void run() { _disruptorMetrics.set(_metrics); } - }, 15000, 15000); - } catch (IllegalStateException e){ - // Ignore. IllegalStateException is thrown by Timer.schedule() if the timer - // has been cancelled. (This happens in unit tests) + }, 15, 15, TimeUnit.SECONDS); } + } public String getName() { @@ -463,7 +462,7 @@ public class DisruptorQueue implements IStatefulObject { publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true); _flusher.close(); _metrics.close(); - METRICS_TIMER.cancel(); + METRICS_REPORTER_EXECUTOR.shutdown(); } catch (InsufficientCapacityException e) { //This should be impossible throw new RuntimeException(e);
