Repository: storm
Updated Branches:
  refs/heads/1.x-branch ddc3c0458 -> b511a8b4f


STORM-1693: Move stats cleanup to executor shutdown


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3923e17e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3923e17e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3923e17e

Branch: refs/heads/1.x-branch
Commit: 3923e17ec55ce2954ca8c1966dbd017ed6a67fef
Parents: a240df5
Author: Abhishek Agarwal <[email protected]>
Authored: Tue Apr 12 15:14:42 2016 +0530
Committer: Abhishek Agarwal <[email protected]>
Committed: Tue Apr 12 15:14:42 2016 +0530

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  2 +-
 storm-core/src/clj/org/apache/storm/stats.clj   | 20 ++++++++++-----
 .../storm/metric/internal/RateTracker.java      | 26 --------------------
 .../org/apache/storm/utils/DisruptorQueue.java  | 16 +++++++-----
 4 files changed, 25 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index e974191..9ea4eb4 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -415,7 +415,7 @@
         (doseq [t threads]
           (.interrupt t)
           (.join t))
-        
+        (stats/cleanup-stats! (:stats executor-data))
         (doseq [user-context (map :user-context (vals task-datas))]
           (doseq [hook (.getHooks user-context)]
             (.cleanup hook)))

http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj 
b/storm-core/src/clj/org/apache/storm/stats.clj
index 68b16fd..9f36dbf 100644
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@ -153,25 +153,25 @@
   [^SpoutExecutorStats stats stream latency-ms]
   (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate 
stats)))
 
-(defn- cleanup-stat! [stat]
+(defn- close-stat! [stat]
   (.close stat))
 
 (defn- cleanup-common-stats!
   [^CommonStats stats]
   (doseq [f COMMON-FIELDS]
-    (cleanup-stat! (f stats))))
+    (close-stat! (f stats))))
 
 (defn cleanup-bolt-stats!
   [^BoltExecutorStats stats]
   (cleanup-common-stats! (:common stats))
   (doseq [f BOLT-FIELDS]
-    (cleanup-stat! (f stats))))
+    (close-stat! (f stats))))
 
 (defn cleanup-spout-stats!
   [^SpoutExecutorStats stats]
   (cleanup-common-stats! (:common stats))
   (doseq [f SPOUT-FIELDS]
-    (cleanup-stat! (f stats))))
+    (close-stat! (f stats))))
 
 (defn- value-stats
   [stats fields]
@@ -188,14 +188,12 @@
 
 (defn value-bolt-stats!
   [^BoltExecutorStats stats]
-  (cleanup-bolt-stats! stats)
   (merge (value-common-stats (:common stats))
          (value-stats stats BOLT-FIELDS)
          {:type :bolt}))
 
 (defn value-spout-stats!
   [^SpoutExecutorStats stats]
-  (cleanup-spout-stats! stats)
   (merge (value-common-stats (:common stats))
          (value-stats stats SPOUT-FIELDS)
          {:type :spout}))
@@ -210,6 +208,16 @@
   [stats]
   (value-bolt-stats! stats))
 
+(defmulti cleanup-stats! class-selector)
+
+(defmethod cleanup-stats! SpoutExecutorStats
+  [stats]
+  (cleanup-spout-stats! stats))
+
+(defmethod cleanup-stats! BoltExecutorStats
+  [stats]
+  (cleanup-bolt-stats! stats))
+
 (defmulti thriftify-specific-stats :type)
 (defmulti clojurify-specific-stats class-selector)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java 
b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
index 65ec931..92a8205 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.metric.internal;
 
-import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -137,29 +136,4 @@ public class RateTracker{
             rotateBuckets(System.currentTimeMillis());
         }
     }
-
-    public static void main (String args[]) throws Exception {
-        final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 
100000000;
-        for (int i = 0; i < 10; i++) {
-            testRate(number);
-        }
-    }
-
-    private static void testRate(int number) {
-        RateTracker rt = new RateTracker(10000, 10);
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < number; i++) {
-            rt.notify(1);
-            if ((i % 1000000) == 0) {
-                //There is an issue with some JVM versions where an integer 
for loop that takes a long time
-                // can starve other threads resulting in  the timer thread not 
getting called.
-                // This is a work around for that, and we still get the same 
results.
-                Thread.yield();
-            }
-        }
-        long end = System.currentTimeMillis();
-        double rate = rt.reportRate();
-        rt.close();
-        System.out.printf("time %,8d count %,8d rate %,15.2f reported rate 
%,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3923e17e/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 19aba06..9dd0019 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -30,6 +30,11 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
 
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.internal.RateTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -46,12 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.metric.api.IStatefulObject;
-import org.apache.storm.metric.internal.RateTracker;
-
 /**
  * A single consumer queue that uses the LMAX Disruptor. They key to the 
performance is
  * the ability to catch up to the producer by processing tuples in batches.
@@ -341,6 +340,10 @@ public class DisruptorQueue implements IStatefulObject {
         public void notifyArrivals(long counts) {
             _rateTracker.notify(counts);
         }
+
+        public void close() {
+            _rateTracker.close();
+        }
     }
 
     private final RingBuffer<AtomicReference<Object>> _buffer;
@@ -393,6 +396,7 @@ public class DisruptorQueue implements IStatefulObject {
         try {
             publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), 
true);
             _flusher.close();
+            _metrics.close();
         } catch (InsufficientCapacityException e) {
             //This should be impossible
             throw new RuntimeException(e);

Reply via email to