Repository: storm Updated Branches: refs/heads/master a41fef386 -> 6cf8a9c99
STORM-1693: Move stats cleanup to executor shutdown Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/983c420b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/983c420b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/983c420b Branch: refs/heads/master Commit: 983c420bfb170fba0b6ae0fd1b1fb3b99d3a1079 Parents: 6415863 Author: Abhishek Agarwal <[email protected]> Authored: Tue Apr 12 21:12:09 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Tue Apr 12 21:12:09 2016 +0530 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 3 ++- .../storm/metric/internal/RateTracker.java | 26 -------------------- .../apache/storm/stats/BoltExecutorStats.java | 6 ++--- .../apache/storm/stats/SpoutExecutorStats.java | 4 --- .../org/apache/storm/utils/DisruptorQueue.java | 5 ++++ 5 files changed, 10 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 8a77a61..6f7c18c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -376,7 +376,8 @@ (doseq [t threads] (.interrupt t) (.join t)) - + + (.cleanupStats (:stats executor-data)) (doseq [user-context (map #(.getUserContext %) (vals task-datas))] (doseq [hook (.getHooks user-context)] (.cleanup hook))) http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java index 65ec931..92a8205 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java +++ b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java @@ -17,7 +17,6 @@ */ package org.apache.storm.metric.internal; -import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; @@ -137,29 +136,4 @@ public class RateTracker{ rotateBuckets(System.currentTimeMillis()); } } - - public static void main (String args[]) throws Exception { - final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000; - for (int i = 0; i < 10; i++) { - testRate(number); - } - } - - private static void testRate(int number) { - RateTracker rt = new RateTracker(10000, 10); - long start = System.currentTimeMillis(); - for (int i = 0; i < number; i++) { - rt.notify(1); - if ((i % 1000000) == 0) { - //There is an issue with some JVM versions where an integer for loop that takes a long time - // can starve other threads resulting in the timer thread not getting called. - // This is a work around for that, and we still get the same results. - Thread.yield(); - } - } - long end = System.currentTimeMillis(); - double rate = rt.reportRate(); - rt.close(); - System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index e26e56b..bfd0d36 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -18,13 +18,15 @@ package org.apache.storm.stats; import com.google.common.collect.Lists; -import java.util.List; + import org.apache.storm.generated.BoltStats; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.metric.internal.MultiCountStatAndMetric; import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; +import java.util.List; + @SuppressWarnings("unchecked") public class BoltExecutorStats extends CommonStats { @@ -83,8 +85,6 @@ public class BoltExecutorStats extends CommonStats { } public ExecutorStats renderStats() { - cleanupStats(); - ExecutorStats ret = new ExecutorStats(); // common stats ret.set_emitted(valueStat(EMITTED)); http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java index 3c09a38..28c885a 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java @@ -17,8 +17,6 @@ */ package org.apache.storm.stats; -import java.util.HashMap; -import java.util.Map; import org.apache.storm.generated.ExecutorSpecificStats; import org.apache.storm.generated.ExecutorStats; import org.apache.storm.generated.SpoutStats; @@ -61,8 +59,6 @@ public class SpoutExecutorStats extends CommonStats { } public ExecutorStats renderStats() { - cleanupStats(); - ExecutorStats ret = new ExecutorStats(); // common fields ret.set_emitted(valueStat(EMITTED)); http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 4482297..05ce565 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -340,6 +340,10 @@ public class DisruptorQueue implements IStatefulObject { public void notifyArrivals(long counts) { _rateTracker.notify(counts); } + + public void close() { + _rateTracker.close(); + } } private final RingBuffer<AtomicReference<Object>> _buffer; @@ -396,6 +400,7 @@ public class DisruptorQueue implements IStatefulObject { try { publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true); _flusher.close(); + _metrics.close(); } catch (InsufficientCapacityException e) { //This should be impossible throw new RuntimeException(e);
