Repository: storm Updated Branches: refs/heads/1.x-branch ddc3c0458 -> b511a8b4f
STORM-1693: Move stats cleanup to executor shutdown Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3923e17e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3923e17e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3923e17e Branch: refs/heads/1.x-branch Commit: 3923e17ec55ce2954ca8c1966dbd017ed6a67fef Parents: a240df5 Author: Abhishek Agarwal <[email protected]> Authored: Tue Apr 12 15:14:42 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Tue Apr 12 15:14:42 2016 +0530 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 2 +- storm-core/src/clj/org/apache/storm/stats.clj | 20 ++++++++++----- .../storm/metric/internal/RateTracker.java | 26 -------------------- .../org/apache/storm/utils/DisruptorQueue.java | 16 +++++++----- 4 files changed, 25 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index e974191..9ea4eb4 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -415,7 +415,7 @@ (doseq [t threads] (.interrupt t) (.join t)) - + (stats/cleanup-stats! (:stats executor-data)) (doseq [user-context (map :user-context (vals task-datas))] (doseq [hook (.getHooks user-context)] (.cleanup hook))) http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/clj/org/apache/storm/stats.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj index 68b16fd..9f36dbf 100644 --- a/storm-core/src/clj/org/apache/storm/stats.clj +++ b/storm-core/src/clj/org/apache/storm/stats.clj @@ -153,25 +153,25 @@ [^SpoutExecutorStats stats stream latency-ms] (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats))) -(defn- cleanup-stat! [stat] +(defn- close-stat! [stat] (.close stat)) (defn- cleanup-common-stats! [^CommonStats stats] (doseq [f COMMON-FIELDS] - (cleanup-stat! (f stats)))) + (close-stat! (f stats)))) (defn cleanup-bolt-stats! [^BoltExecutorStats stats] (cleanup-common-stats! (:common stats)) (doseq [f BOLT-FIELDS] - (cleanup-stat! (f stats)))) + (close-stat! (f stats)))) (defn cleanup-spout-stats! [^SpoutExecutorStats stats] (cleanup-common-stats! (:common stats)) (doseq [f SPOUT-FIELDS] - (cleanup-stat! (f stats)))) + (close-stat! (f stats)))) (defn- value-stats [stats fields] @@ -188,14 +188,12 @@ (defn value-bolt-stats! [^BoltExecutorStats stats] - (cleanup-bolt-stats! stats) (merge (value-common-stats (:common stats)) (value-stats stats BOLT-FIELDS) {:type :bolt})) (defn value-spout-stats! [^SpoutExecutorStats stats] - (cleanup-spout-stats! stats) (merge (value-common-stats (:common stats)) (value-stats stats SPOUT-FIELDS) {:type :spout})) @@ -210,6 +208,16 @@ [stats] (value-bolt-stats! stats)) +(defmulti cleanup-stats! class-selector) + +(defmethod cleanup-stats! SpoutExecutorStats + [stats] + (cleanup-spout-stats! stats)) + +(defmethod cleanup-stats! BoltExecutorStats + [stats] + (cleanup-bolt-stats! stats)) + (defmulti thriftify-specific-stats :type) (defmulti clojurify-specific-stats class-selector) http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java index 65ec931..92a8205 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java +++ b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java @@ -17,7 +17,6 @@ */ package org.apache.storm.metric.internal; -import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; @@ -137,29 +136,4 @@ public class RateTracker{ rotateBuckets(System.currentTimeMillis()); } } - - public static void main (String args[]) throws Exception { - final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 100000000; - for (int i = 0; i < 10; i++) { - testRate(number); - } - } - - private static void testRate(int number) { - RateTracker rt = new RateTracker(10000, 10); - long start = System.currentTimeMillis(); - for (int i = 0; i < number; i++) { - rt.notify(1); - if ((i % 1000000) == 0) { - //There is an issue with some JVM versions where an integer for loop that takes a long time - // can starve other threads resulting in the timer thread not getting called. - // This is a work around for that, and we still get the same results. - Thread.yield(); - } - } - long end = System.currentTimeMillis(); - double rate = rt.reportRate(); - rt.close(); - System.out.printf("time %,8d count %,8d rate %,15.2f reported rate %,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 19aba06..9dd0019 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -30,6 +30,11 @@ import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metric.internal.RateTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -46,12 +51,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.metric.api.IStatefulObject; -import org.apache.storm.metric.internal.RateTracker; - /** * A single consumer queue that uses the LMAX Disruptor. They key to the performance is * the ability to catch up to the producer by processing tuples in batches. @@ -341,6 +340,10 @@ public class DisruptorQueue implements IStatefulObject { public void notifyArrivals(long counts) { _rateTracker.notify(counts); } + + public void close() { + _rateTracker.close(); + } } private final RingBuffer<AtomicReference<Object>> _buffer; @@ -393,6 +396,7 @@ public class DisruptorQueue implements IStatefulObject { try { publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), true); _flusher.close(); + _metrics.close(); } catch (InsufficientCapacityException e) { //This should be impossible throw new RuntimeException(e);
