Repository: storm
Updated Branches:
  refs/heads/master a41fef386 -> 6cf8a9c99


STORM-1693: Move stats cleanup to executor shutdown


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/983c420b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/983c420b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/983c420b

Branch: refs/heads/master
Commit: 983c420bfb170fba0b6ae0fd1b1fb3b99d3a1079
Parents: 6415863
Author: Abhishek Agarwal <[email protected]>
Authored: Tue Apr 12 21:12:09 2016 +0530
Committer: Abhishek Agarwal <[email protected]>
Committed: Tue Apr 12 21:12:09 2016 +0530

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  3 ++-
 .../storm/metric/internal/RateTracker.java      | 26 --------------------
 .../apache/storm/stats/BoltExecutorStats.java   |  6 ++---
 .../apache/storm/stats/SpoutExecutorStats.java  |  4 ---
 .../org/apache/storm/utils/DisruptorQueue.java  |  5 ++++
 5 files changed, 10 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 8a77a61..6f7c18c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -376,7 +376,8 @@
         (doseq [t threads]
           (.interrupt t)
           (.join t))
-        
+
+        (.cleanupStats (:stats executor-data))
         (doseq [user-context (map #(.getUserContext %) (vals task-datas))]
           (doseq [hook (.getHooks user-context)]
             (.cleanup hook)))

http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java 
b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
index 65ec931..92a8205 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/internal/RateTracker.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.metric.internal;
 
-import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -137,29 +136,4 @@ public class RateTracker{
             rotateBuckets(System.currentTimeMillis());
         }
     }
-
-    public static void main (String args[]) throws Exception {
-        final int number = (args.length >= 1) ? Integer.parseInt(args[0]) : 
100000000;
-        for (int i = 0; i < 10; i++) {
-            testRate(number);
-        }
-    }
-
-    private static void testRate(int number) {
-        RateTracker rt = new RateTracker(10000, 10);
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < number; i++) {
-            rt.notify(1);
-            if ((i % 1000000) == 0) {
-                //There is an issue with some JVM versions where an integer 
for loop that takes a long time
-                // can starve other threads resulting in  the timer thread not 
getting called.
-                // This is a work around for that, and we still get the same 
results.
-                Thread.yield();
-            }
-        }
-        long end = System.currentTimeMillis();
-        double rate = rt.reportRate();
-        rt.close();
-        System.out.printf("time %,8d count %,8d rate %,15.2f reported rate 
%,15.2f\n", end-start,number, ((number * 1000.0)/(end-start)), rate);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java 
b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index e26e56b..bfd0d36 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -18,13 +18,15 @@
 package org.apache.storm.stats;
 
 import com.google.common.collect.Lists;
-import java.util.List;
+
 import org.apache.storm.generated.BoltStats;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
+import java.util.List;
+
 @SuppressWarnings("unchecked")
 public class BoltExecutorStats extends CommonStats {
 
@@ -83,8 +85,6 @@ public class BoltExecutorStats extends CommonStats {
     }
 
     public ExecutorStats renderStats() {
-        cleanupStats();
-
         ExecutorStats ret = new ExecutorStats();
         // common stats
         ret.set_emitted(valueStat(EMITTED));

http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java 
b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 3c09a38..28c885a 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -17,8 +17,6 @@
  */
 package org.apache.storm.stats;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.SpoutStats;
@@ -61,8 +59,6 @@ public class SpoutExecutorStats extends CommonStats {
     }
 
     public ExecutorStats renderStats() {
-        cleanupStats();
-
         ExecutorStats ret = new ExecutorStats();
         // common fields
         ret.set_emitted(valueStat(EMITTED));

http://git-wip-us.apache.org/repos/asf/storm/blob/983c420b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java 
b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 4482297..05ce565 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -340,6 +340,10 @@ public class DisruptorQueue implements IStatefulObject {
         public void notifyArrivals(long counts) {
             _rateTracker.notify(counts);
         }
+
+        public void close() {
+            _rateTracker.close();
+        }
     }
 
     private final RingBuffer<AtomicReference<Object>> _buffer;
@@ -396,6 +400,7 @@ public class DisruptorQueue implements IStatefulObject {
         try {
             publishDirect(new ArrayList<Object>(Arrays.asList(INTERRUPT)), 
true);
             _flusher.close();
+            _metrics.close();
         } catch (InsufficientCapacityException e) {
             //This should be impossible
             throw new RuntimeException(e);

Reply via email to