move update tuple stat/renderStats methods to corresponding ExecutorStat classes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f61ea0c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f61ea0c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f61ea0c0

Branch: refs/heads/master
Commit: f61ea0c0196da4f31126f3f96ffb2bf5551a01d2
Parents: 52d3b58
Author: 卫乐 <[email protected]>
Authored: Thu Feb 25 10:59:42 2016 +0800
Committer: 卫乐 <[email protected]>
Committed: Thu Feb 25 10:59:42 2016 +0800

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  18 +--
 .../src/clj/org/apache/storm/daemon/task.clj    |   9 +-
 .../apache/storm/stats/BoltExecutorStats.java   |  45 ++++++
 .../jvm/org/apache/storm/stats/CommonStats.java |  40 +++++
 .../apache/storm/stats/SpoutExecutorStats.java  |  35 +++++
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 147 +------------------
 .../test/clj/org/apache/storm/nimbus_test.clj   |   4 +-
 7 files changed, 139 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index bca03df..8009f6c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -17,7 +17,7 @@
   (:use [org.apache.storm.daemon common])
   (:import [org.apache.storm.generated Grouping Grouping$_Fields]
            [java.io Serializable]
-           [org.apache.storm.stats StatsUtil])
+           [org.apache.storm.stats BoltExecutorStats SpoutExecutorStats])
   (:use [org.apache.storm util config log])
   (:import [java.util List Random HashMap ArrayList LinkedList Map])
   (:import [org.apache.storm ICredentialsListener Thrift])
@@ -408,7 +408,7 @@
     (reify
       RunningExecutor
       (render-stats [this]
-        (clojurify-structure (StatsUtil/renderStats (:stats executor-data))))
+        (clojurify-structure (.renderStats (:stats executor-data))))
       (get-executor-id [this]
         executor-id)
       (credentials-changed [this creds]
@@ -448,7 +448,7 @@
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. 
msg-id task-id time-delta))
     (when time-delta
-      (StatsUtil/spoutFailedTuple (:stats executor-data) (:stream tuple-info) 
time-delta))))
+      (.spoutFailedTuple (:stats executor-data) (:stream tuple-info) 
time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
   (let [storm-conf (:storm-conf executor-data)
@@ -459,7 +459,7 @@
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. 
msg-id task-id time-delta))
     (when time-delta
-      (StatsUtil/spoutAckedTuple (:stats executor-data) (:stream tuple-info) 
time-delta))))
+      (.spoutAckedTuple (:stats executor-data) (:stream tuple-info) 
time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
   (let [task-ids (:task-ids executor-data)
@@ -740,7 +740,7 @@
 
                                   (task/apply-hooks user-context .boltExecute 
(BoltExecuteInfo. tuple task-id delta))
                                   (when delta
-                                    (StatsUtil/boltExecuteTuple executor-stats
+                                    (.boltExecuteTuple executor-stats
                                                                
(.getSourceComponent tuple)
                                                                
(.getSourceStreamId tuple)
                                                                delta)))))))
@@ -813,7 +813,7 @@
                                                 (log-message "BOLT ack TASK: " 
task-id " TIME: " delta " TUPLE: " tuple))
                                               (task/apply-hooks user-context 
.boltAck (BoltAckInfo. tuple task-id delta))
                                               (when delta
-                                                (StatsUtil/boltAckedTuple 
executor-stats
+                                                (.boltAckedTuple executor-stats
                                                                          
(.getSourceComponent tuple)
                                                                          
(.getSourceStreamId tuple)
                                                                          
delta))))
@@ -828,7 +828,7 @@
                                                 (log-message "BOLT fail TASK: 
" task-id " TIME: " delta " TUPLE: " tuple))
                                               (task/apply-hooks user-context 
.boltFail (BoltFailInfo. tuple task-id delta))
                                               (when delta
-                                                (StatsUtil/boltFailedTuple 
executor-stats
+                                                (.boltFailedTuple 
executor-stats
                                                                           
(.getSourceComponent tuple)
                                                                           
(.getSourceStreamId tuple)
                                                                           
delta))))
@@ -863,7 +863,7 @@
 
 ;; TODO: refactor this to be part of an executor-specific map
 (defmethod mk-executor-stats :spout [_ rate]
-  (StatsUtil/mkSpoutStats rate))
+  (SpoutExecutorStats/mkSpoutStats rate))
 
 (defmethod mk-executor-stats :bolt [_ rate]
-  (StatsUtil/mkBoltStats rate))
+  (BoltExecutorStats/mkBoltStats rate))

http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj 
b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index c9f6828..707cdda 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -26,7 +26,6 @@
   (:import [org.apache.storm.utils Utils ConfigUtils])
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
-  (:import [org.apache.storm.stats StatsUtil])
   (:import [java.util Collection List ArrayList])
   (:import [org.apache.storm Thrift])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
@@ -140,9 +139,9 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task 
expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id 
[out-task-id]))
             (when (emit-sampler)
-              (StatsUtil/emittedTuple executor-stats stream)
+              (.emittedTuple executor-stats stream)
               (if out-task-id
-                (StatsUtil/transferredTuples executor-stats stream, 1)))
+                (.transferredTuples executor-stats stream, 1)))
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
@@ -162,8 +161,8 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id 
out-tasks))
              (when (emit-sampler)
-               (StatsUtil/emittedTuple executor-stats stream)
-               (StatsUtil/transferredTuples executor-stats stream (count 
out-tasks)))
+               (.emittedTuple executor-stats stream)
+               (.transferredTuples executor-stats stream (count out-tasks)))
              out-tasks)))
     ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java 
b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 7909a08..d694bc3 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -17,9 +17,13 @@
  */
 package org.apache.storm.stats;
 
+import clojure.lang.PersistentVector;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
+@SuppressWarnings("unchecked")
 public class BoltExecutorStats extends CommonStats {
 
     public static final String ACKED = "acked";
@@ -59,4 +63,45 @@ public class BoltExecutorStats extends CommonStats {
     public MultiLatencyStatAndMetric getExecuteLatencies() {
         return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES);
     }
+
+    public void boltExecuteTuple(String component, String stream, long 
latencyMs) {
+        Object key = PersistentVector.create(component, stream);
+        this.getExecuted().incBy(key, this.rate);
+        this.getExecuteLatencies().record(key, latencyMs);
+    }
+
+    public void boltAckedTuple(String component, String stream, long 
latencyMs) {
+        Object key = PersistentVector.create(component, stream);
+        this.getAcked().incBy(key, this.rate);
+        this.getProcessLatencies().record(key, latencyMs);
+    }
+
+    public void boltFailedTuple(String component, String stream, long 
latencyMs) {
+        Object key = PersistentVector.create(component, stream);
+        this.getFailed().incBy(key, this.rate);
+
+    }
+
+    public Map renderStats() {
+        cleanupStats();
+        Map ret = new HashMap();
+        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
+        ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS));
+        StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
+
+        return ret;
+    }
+
+    public void cleanupStats() {
+        super.cleanupStats();
+        for (String field : BOLT_FIELDS) {
+            cleanupStat(this.get(field));
+        }
+    }
+
+    public static BoltExecutorStats mkBoltStats(int rate) {
+        BoltExecutorStats stats = new BoltExecutorStats();
+        stats.setRate(rate);
+        return stats;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java 
b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
index a8bf706..93d42a4 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -21,7 +21,9 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
+@SuppressWarnings("unchecked")
 public class CommonStats {
     public static final int NUM_STAT_BUCKETS = 20;
 
@@ -62,4 +64,42 @@ public class CommonStats {
     protected void put(String field, Object value) {
         StatsUtil.putRawKV(metricMap, field, value);
     }
+
+    public void emittedTuple(String stream) {
+        this.getEmitted().incBy(stream, this.rate);
+    }
+
+    public void transferredTuples(String stream, int amount) {
+        this.getTransferred().incBy(stream, this.rate * amount);
+    }
+
+    protected void cleanupStats() {
+        for (String field : COMMON_FIELDS) {
+            cleanupStat(this.get(field));
+        }
+    }
+
+    protected void cleanupStat(IMetric metric) {
+        if (metric instanceof MultiCountStatAndMetric) {
+            ((MultiCountStatAndMetric) metric).close();
+        } else if (metric instanceof MultiLatencyStatAndMetric) {
+            ((MultiLatencyStatAndMetric) metric).close();
+        }
+    }
+
+    protected Map valueStats(String[] fields) {
+        Map ret = new HashMap();
+        for (String field : fields) {
+            IMetric metric = this.get(field);
+            if (metric instanceof MultiCountStatAndMetric) {
+                StatsUtil.putRawKV(ret, field, ((MultiCountStatAndMetric) 
metric).getTimeCounts());
+            } else if (metric instanceof MultiLatencyStatAndMetric) {
+                StatsUtil.putRawKV(ret, field, ((MultiLatencyStatAndMetric) 
metric).getTimeLatAvg());
+            }
+        }
+        StatsUtil.putRawKV(ret, CommonStats.RATE, this.getRate());
+
+        return ret;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java 
b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index 621ac24..d6d9162 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -17,9 +17,12 @@
  */
 package org.apache.storm.stats;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.storm.metric.internal.MultiCountStatAndMetric;
 import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 
+@SuppressWarnings("unchecked")
 public class SpoutExecutorStats extends CommonStats {
 
     public static final String ACKED = "acked";
@@ -46,4 +49,36 @@ public class SpoutExecutorStats extends CommonStats {
     public MultiLatencyStatAndMetric getCompleteLatencies() {
         return (MultiLatencyStatAndMetric) this.get(COMPLETE_LATENCIES);
     }
+
+    public void spoutAckedTuple(String stream, long latencyMs) {
+        this.getAcked().incBy(stream, this.rate);
+        this.getCompleteLatencies().record(stream, latencyMs);
+    }
+
+    public void spoutFailedTuple(String stream, long latencyMs) {
+        this.getFailed().incBy(stream, this.rate);
+    }
+
+    public Map renderStats() {
+        cleanupStats();
+        Map ret = new HashMap();
+        ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
+        ret.putAll(valueStats(SpoutExecutorStats.SPOUT_FIELDS));
+        StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_SPOUT);
+
+        return ret;
+    }
+
+    public void cleanupStats() {
+        super.cleanupStats();
+        for (String field : SpoutExecutorStats.SPOUT_FIELDS) {
+            cleanupStat(this.get(field));
+        }
+    }
+
+    public static SpoutExecutorStats mkSpoutStats(int rate) {
+        SpoutExecutorStats stats = new SpoutExecutorStats();
+        stats.setRate(rate);
+        return stats;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java 
b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
index 144872f..22ececf 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
@@ -48,9 +48,6 @@ import org.apache.storm.generated.SpoutStats;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.TopologyPageInfo;
 import org.apache.storm.generated.TopologyStats;
-import org.apache.storm.metric.api.IMetric;
-import org.apache.storm.metric.internal.MultiCountStatAndMetric;
-import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,9 +56,11 @@ import org.slf4j.LoggerFactory;
 public class StatsUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(StatsUtil.class);
 
-    private static final String TYPE = "type";
+    public static final String TYPE = "type";
     private static final String SPOUT = "spout";
     private static final String BOLT = "bolt";
+    public static final Keyword KW_SPOUT = keyword(SPOUT);
+    public static final Keyword KW_BOLT = keyword(BOLT);
 
     private static final String UPTIME = "uptime";
     private static final String HOST = "host";
@@ -111,9 +110,6 @@ public class StatsUtil {
     private static final String CID_SID_TO_IN_STATS = "cid+sid->input-stats";
     private static final String WORKERS_SET = "workers-set";
 
-    private static final Keyword KW_SPOUT = keyword(SPOUT);
-    private static final Keyword KW_BOLT = keyword(BOLT);
-
     public static final int TEN_MIN_IN_SECONDS = 60 * 10;
     public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + 
"";
 
@@ -124,120 +120,6 @@ public class StatsUtil {
 
 
     // 
=====================================================================================
-    // update stats methods
-    // 
=====================================================================================
-
-    public static BoltExecutorStats mkBoltStats(int rate) {
-        BoltExecutorStats stats = new BoltExecutorStats();
-        stats.setRate(rate);
-        return stats;
-    }
-
-    public static SpoutExecutorStats mkSpoutStats(int rate) {
-        SpoutExecutorStats stats = new SpoutExecutorStats();
-        stats.setRate(rate);
-        return stats;
-    }
-
-    public static void emittedTuple(CommonStats stats, String stream) {
-        stats.getEmitted().incBy(stream, stats.rate);
-    }
-
-    public static void transferredTuples(CommonStats stats, String stream, int 
amount) {
-        stats.getTransferred().incBy(stream, stats.rate * amount);
-    }
-
-    public static void boltExecuteTuple(BoltExecutorStats stats, String 
component, String stream, long latencyMs) {
-        Object key = PersistentVector.create(component, stream);
-        stats.getExecuted().incBy(key, stats.rate);
-        stats.getExecuteLatencies().record(key, latencyMs);
-    }
-
-    public static void boltAckedTuple(BoltExecutorStats stats, String 
component, String stream, long latencyMs) {
-        Object key = PersistentVector.create(component, stream);
-        stats.getAcked().incBy(key, stats.rate);
-        stats.getProcessLatencies().record(key, latencyMs);
-    }
-
-    public static void boltFailedTuple(BoltExecutorStats stats, String 
component, String stream, long latencyMs) {
-        Object key = PersistentVector.create(component, stream);
-        stats.getFailed().incBy(key, stats.rate);
-
-    }
-
-    public static void spoutAckedTuple(SpoutExecutorStats stats, String 
stream, long latencyMs) {
-        stats.getAcked().incBy(stream, stats.rate);
-        stats.getCompleteLatencies().record(stream, latencyMs);
-    }
-
-    public static void spoutFailedTuple(SpoutExecutorStats stats, String 
stream, long latencyMs) {
-        stats.getFailed().incBy(stream, stats.rate);
-    }
-
-    private static void cleanupStat(IMetric metric) {
-        if (metric instanceof MultiCountStatAndMetric) {
-            ((MultiCountStatAndMetric) metric).close();
-        } else if (metric instanceof MultiLatencyStatAndMetric) {
-            ((MultiLatencyStatAndMetric) metric).close();
-        }
-    }
-
-    public static Map renderStats(SpoutExecutorStats stats) {
-        cleanupSpoutStats(stats);
-        Map ret = new HashMap();
-        ret.putAll(valueStats(stats, CommonStats.COMMON_FIELDS));
-        ret.putAll(valueStats(stats, SpoutExecutorStats.SPOUT_FIELDS));
-        putRawKV(ret, TYPE, KW_SPOUT);
-
-        return ret;
-    }
-
-    public static Map renderStats(BoltExecutorStats stats) {
-        cleanupBoltStats(stats);
-        Map ret = new HashMap();
-        ret.putAll(valueStats(stats, CommonStats.COMMON_FIELDS));
-        ret.putAll(valueStats(stats, BoltExecutorStats.BOLT_FIELDS));
-        putRawKV(ret, TYPE, KW_BOLT);
-
-        return ret;
-    }
-
-    public static void cleanupSpoutStats(SpoutExecutorStats stats) {
-        cleanupCommonStats(stats);
-        for (String field : SpoutExecutorStats.SPOUT_FIELDS) {
-            cleanupStat(stats.get(field));
-        }
-    }
-
-    public static void cleanupBoltStats(BoltExecutorStats stats) {
-        cleanupCommonStats(stats);
-        for (String field : BoltExecutorStats.BOLT_FIELDS) {
-            cleanupStat(stats.get(field));
-        }
-    }
-
-    public static void cleanupCommonStats(CommonStats stats) {
-        for (String field : CommonStats.COMMON_FIELDS) {
-            cleanupStat(stats.get(field));
-        }
-    }
-
-    private static Map valueStats(CommonStats stats, String[] fields) {
-        Map ret = new HashMap();
-        for (String field : fields) {
-            IMetric metric = stats.get(field);
-            if (metric instanceof MultiCountStatAndMetric) {
-                putRawKV(ret, field, ((MultiCountStatAndMetric) 
metric).getTimeCounts());
-            } else if (metric instanceof MultiLatencyStatAndMetric) {
-                putRawKV(ret, field, ((MultiLatencyStatAndMetric) 
metric).getTimeLatAvg());
-            }
-        }
-        putRawKV(ret, CommonStats.RATE, stats.getRate());
-
-        return ret;
-    }
-
-    // 
=====================================================================================
     // aggregation stats methods
     // 
=====================================================================================
 
@@ -1166,9 +1048,6 @@ public class StatsUtil {
         return ret;
     }
 
-    /**
-     * called in nimbus.clj
-     */
     public static ComponentPageInfo aggCompExecsStats(
             Map exec2hostPort, Map task2component, Map beats, String window, 
boolean includeSys,
             String topologyId, StormTopology topology, String componentId) {
@@ -1184,9 +1063,6 @@ public class StatsUtil {
     // clojurify stats methods
     // 
=====================================================================================
 
-    /**
-     * called in converter.clj
-     */
     public static Map clojurifyStats(Map stats) {
         Map ret = new HashMap();
         for (Object o : stats.entrySet()) {
@@ -1245,9 +1121,6 @@ public class StatsUtil {
         return ret;
     }
 
-    /**
-     * caller: nimbus.clj
-     */
     public static List extractNodeInfosFromHbForComp(
             Map exec2hostPort, Map task2component, boolean includeSys, String 
compId) {
         List ret = new ArrayList();
@@ -1340,7 +1213,7 @@ public class StatsUtil {
 
 
     /**
-     * caller: core.clj
+     * computes max bolt capacity
      *
      * @param executorSumms a list of ExecutorSummary
      * @return max bolt capacity
@@ -1774,9 +1647,6 @@ public class StatsUtil {
         return ret;
     }
 
-    /**
-     * called in converter.clj
-     */
     public static Map thriftifyStats(List stats) {
         Map ret = new HashMap();
         for (Object o : stats) {
@@ -1791,9 +1661,6 @@ public class StatsUtil {
         return ret;
     }
 
-    /**
-     * called in nimbus.clj
-     */
     public static ExecutorStats thriftifyExecutorStats(Map stats) {
         ExecutorStats ret = new ExecutorStats();
         ExecutorSpecificStats specificStats = thriftifySpecificStats(stats);
@@ -2091,16 +1958,10 @@ public class StatsUtil {
         return t / c;
     }
 
-    /**
-     * caller: core.clj
-     */
     public static String floatStr(double n) {
         return String.format("%.3f", n);
     }
 
-    /**
-     * caller: core.clj
-     */
     public static String errorSubset(String errorStr) {
         return errorStr.substring(0, 200);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f61ea0c0/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj 
b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index a76db54..5964e6f 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -23,7 +23,7 @@
            [org.apache.storm.nimbus InMemoryTopologyActionNotifier]
            [org.apache.storm.generated GlobalStreamId]
            [org.apache.storm Thrift]
-           [org.apache.storm.stats StatsUtil])
+           [org.apache.storm.stats BoltExecutorStats])
   (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
   (:import [org.apache.storm.scheduler INimbus])
   (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
@@ -141,7 +141,7 @@
         stats (:executor-stats curr-beat)]
     (.worker-heartbeat! state storm-id node port
       {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10
-       :executor-stats (merge stats {executor (clojurify-structure 
(StatsUtil/renderStats (StatsUtil/mkBoltStats 20)))})}
+       :executor-stats (merge stats {executor (clojurify-structure 
(.renderStats (BoltExecutorStats/mkBoltStats 20)))})}
       )))
 
 (defn slot-assignments [cluster storm-id]

Reply via email to