added method comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7b354287 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7b354287 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7b354287 Branch: refs/heads/master Commit: 7b354287227de358cc357ac45c18ac2b1a679202 Parents: 5bd5bd7 Author: å«ä¹ <[email protected]> Authored: Wed Mar 9 13:05:36 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Wed Mar 9 13:05:36 2016 +0800 ---------------------------------------------------------------------- .../jvm/org/apache/storm/stats/StatsUtil.java | 313 ++++++++++++++----- 1 file changed, 231 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7b354287/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index 7650ab1..aa1b234 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -144,9 +144,10 @@ public class StatsUtil { } /** - * Aggregates number acked and complete latencies across all streams. + * aggregate number acked and complete latencies across all streams. */ - public static Map<String, Number> aggSpoutLatAndCount(Map<String, Double> id2compAvg, Map<String, Long> id2numAcked) { + public static Map<String, Number> aggSpoutLatAndCount(Map<String, Double> id2compAvg, + Map<String, Long> id2numAcked) { Map<String, Number> ret = new HashMap<>(); putKV(ret, COMP_LAT_TOTAL, weightAvgAndSum(id2compAvg, id2numAcked)); putKV(ret, ACKED, sumValues(id2numAcked)); @@ -155,15 +156,17 @@ public class StatsUtil { } /** - * Aggregates number executed and process & execute latencies. + * aggregate number executed and process & execute latencies. */ - public static Map aggBoltStreamsLatAndCount(Map id2execAvg, Map id2procAvg, Map id2numExec) { - Map ret = new HashMap(); + public static <K> Map<K, Map> aggBoltStreamsLatAndCount(Map<K, Double> id2execAvg, + Map<K, Double> id2procAvg, + Map<K, Long> id2numExec) { + Map<K, Map> ret = new HashMap<>(); if (id2execAvg == null || id2procAvg == null || id2numExec == null) { return ret; } - for (Object k : id2execAvg.keySet()) { - Map subMap = new HashMap(); + for (K k : id2execAvg.keySet()) { + Map<String, Object> subMap = new HashMap<>(); putKV(subMap, EXEC_LAT_TOTAL, weightAvg(id2execAvg, id2numExec, k)); putKV(subMap, PROC_LAT_TOTAL, weightAvg(id2procAvg, id2numExec, k)); putKV(subMap, EXECUTED, id2numExec.get(k)); @@ -175,12 +178,13 @@ public class StatsUtil { /** * Aggregates number acked and complete latencies. */ - public static Map aggSpoutStreamsLatAndCount(Map id2compAvg, Map id2acked) { - Map ret = new HashMap(); + public static <K> Map<K, Map> aggSpoutStreamsLatAndCount(Map<K, Double> id2compAvg, + Map<K, Long> id2acked) { + Map<K, Map> ret = new HashMap<>(); if (id2compAvg == null || id2acked == null) { return ret; } - for (Object k : id2compAvg.keySet()) { + for (K k : id2compAvg.keySet()) { Map subMap = new HashMap(); putKV(subMap, COMP_LAT_TOTAL, weightAvg(id2compAvg, id2acked, k)); putKV(subMap, ACKED, id2acked.get(k)); @@ -189,17 +193,29 @@ public class StatsUtil { return ret; } - public static Map aggPreMergeCompPageBolt(Map<String, Object> m, String window, boolean includeSys) { - Map ret = new HashMap(); - putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id")); - putKV(ret, HOST, getByKey(m, HOST)); - putKV(ret, PORT, getByKey(m, PORT)); - putKV(ret, UPTIME, getByKey(m, UPTIME)); + /** + * pre-merge component page bolt stats from an executor heartbeat + * 1. computes component capacity + * 2. converts map keys of stats + * 3. filters streams if necessary + * + * @param beat executor heartbeat data + * @param window specified window + * @param includeSys whether to include system streams + * @return per-merged stats + */ + public static Map<String, Object> aggPreMergeCompPageBolt(Map<String, Object> beat, String window, boolean includeSys) { + Map<String, Object> ret = new HashMap<>(); + + putKV(ret, EXECUTOR_ID, getByKey(beat, "exec-id")); + putKV(ret, HOST, getByKey(beat, HOST)); + putKV(ret, PORT, getByKey(beat, PORT)); + putKV(ret, UPTIME, getByKey(beat, UPTIME)); putKV(ret, NUM_EXECUTORS, 1); - putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS)); + putKV(ret, NUM_TASKS, getByKey(beat, NUM_TASKS)); - Map stat2win2sid2num = getMapByKey(m, STATS); - putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(m, UPTIME).intValue())); + Map stat2win2sid2num = getMapByKey(beat, STATS); + putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(beat, UPTIME).intValue())); // calc cid+sid->input_stats Map inputStats = new HashMap(); @@ -236,16 +252,27 @@ public class StatsUtil { return ret; } - public static Map<String, Object> aggPreMergeCompPageSpout(Map<String, Object> m, String window, boolean includeSys) { + /** + * pre-merge component page spout stats from an executor heartbeat + * 1. computes component capacity + * 2. converts map keys of stats + * 3. filters streams if necessary + * + * @param beat executor heartbeat data + * @param window specified window + * @param includeSys whether to include system streams + * @return per-merged stats + */ + public static Map<String, Object> aggPreMergeCompPageSpout(Map<String, Object> beat, String window, boolean includeSys) { Map<String, Object> ret = new HashMap<>(); - putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id")); - putKV(ret, HOST, getByKey(m, HOST)); - putKV(ret, PORT, getByKey(m, PORT)); - putKV(ret, UPTIME, getByKey(m, UPTIME)); + putKV(ret, EXECUTOR_ID, getByKey(beat, "exec-id")); + putKV(ret, HOST, getByKey(beat, HOST)); + putKV(ret, PORT, getByKey(beat, PORT)); + putKV(ret, UPTIME, getByKey(beat, UPTIME)); putKV(ret, NUM_EXECUTORS, 1); - putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS)); + putKV(ret, NUM_TASKS, getByKey(beat, NUM_TASKS)); - Map stat2win2sid2num = getMapByKey(m, STATS); + Map stat2win2sid2num = getMapByKey(beat, STATS); // calc sid->output-stats Map outputStats = new HashMap(); @@ -269,16 +296,24 @@ public class StatsUtil { return ret; } + /** + * pre-merge component stats of specified bolt id + * + * @param beat executor heartbeat data + * @param window specified window + * @param includeSys whether to include system streams + * @return { comp id -> comp-stats } + */ public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageBolt( - Map<String, Object> m, String window, boolean includeSys) { + Map<String, Object> beat, String window, boolean includeSys) { Map<String, Object> ret = new HashMap<>(); Map<String, Object> subRet = new HashMap<>(); putKV(subRet, NUM_EXECUTORS, 1); - putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS)); + putKV(subRet, NUM_TASKS, getByKey(beat, NUM_TASKS)); - Map<String, Object> stat2win2sid2num = getMapByKey(m, STATS); - putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(m, UPTIME).intValue())); + Map<String, Object> stat2win2sid2num = getMapByKey(beat, STATS); + putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(beat, UPTIME).intValue())); for (String key : new String[]{EMITTED, TRANSFERRED, ACKED, FAILED}) { Map<String, Map<K, V>> stat = windowSetConverter(getMapByKey(stat2win2sid2num, key), TO_STRING); @@ -304,12 +339,12 @@ public class StatsUtil { subRet.putAll(aggBoltLatAndCount( win2sid2execLat.get(window), win2sid2procLat.get(window), win2sid2exec.get(window))); - ret.put((String) getByKey(m, "comp-id"), subRet); + ret.put((String) getByKey(beat, "comp-id"), subRet); return ret; } /** - * returns { comp id -> comp-stats } + * pre-merge component stats of specified spout id and returns { comp id -> comp-stats } */ public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageSpout( Map<String, Object> m, String window, boolean includeSys) { @@ -346,6 +381,13 @@ public class StatsUtil { return ret; } + /** + * merge accumulated bolt stats with pre-merged component stats + * + * @param accBoltStats accumulated bolt stats + * @param boltStats pre-merged component stats + * @return merged stats + */ public static Map<String, Object> mergeAggCompStatsCompPageBolt( Map<String, Object> accBoltStats, Map<String, Object> boltStats) { Map<String, Object> ret = new HashMap<>(); @@ -395,6 +437,9 @@ public class StatsUtil { return ret; } + /** + * merge accumulated bolt stats with pre-merged component stats + */ public static Map<String, Object> mergeAggCompStatsCompPageSpout( Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) { Map<String, Object> ret = new HashMap<>(); @@ -432,7 +477,15 @@ public class StatsUtil { return ret; } - public static Map<String, Object> mergeAggCompStatsTopoPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) { + /** + * merge accumulated bolt stats with new bolt stats + * + * @param accBoltStats accumulated bolt stats + * @param boltStats new input bolt stats + * @return merged bolt stats + */ + public static Map<String, Object> mergeAggCompStatsTopoPageBolt(Map<String, Object> accBoltStats, + Map<String, Object> boltStats) { Map<String, Object> ret = new HashMap<>(); Integer numExecutors = getByKeyOr0(accBoltStats, NUM_EXECUTORS).intValue(); @@ -459,7 +512,11 @@ public class StatsUtil { return ret; } - public static Map<String, Object> mergeAggCompStatsTopoPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) { + /** + * merge accumulated bolt stats with new bolt stats + */ + public static Map<String, Object> mergeAggCompStatsTopoPageSpout(Map<String, Object> accSpoutStats, + Map<String, Object> spoutStats) { Map<String, Object> ret = new HashMap<>(); Integer numExecutors = getByKeyOr0(accSpoutStats, NUM_EXECUTORS).intValue(); @@ -485,7 +542,7 @@ public class StatsUtil { * executor with the given map for the topology page. */ public static Map<String, Object> aggTopoExecStats( - String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> newData, String compType) { + String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> beat, String compType) { Map<String, Object> ret = new HashMap<>(); Set workerSet = (Set) accStats.get(WORKERS_SET); @@ -501,12 +558,12 @@ public class StatsUtil { // component id -> stats Map<String, Object> cid2stats; if (isSpout) { - cid2stats = aggPreMergeTopoPageSpout(newData, window, includeSys); + cid2stats = aggPreMergeTopoPageSpout(beat, window, includeSys); } else { - cid2stats = aggPreMergeTopoPageBolt(newData, window, includeSys); + cid2stats = aggPreMergeTopoPageBolt(beat, window, includeSys); } - Map stats = getMapByKey(newData, STATS); + Map stats = getMapByKey(beat, STATS); Map w2compLatWgtAvg, w2acked; Map compLatStats = getMapByKey(stats, COMP_LATENCIES); if (isSpout) { // agg spout stats @@ -524,7 +581,7 @@ public class StatsUtil { w2acked = aggregateCountStreams(getMapByKey(stats, ACKED)); } - workerSet.add(Lists.newArrayList(getByKey(newData, HOST), getByKey(newData, PORT))); + workerSet.add(Lists.newArrayList(getByKey(beat, HOST), getByKey(beat, PORT))); putKV(ret, WORKERS_SET, workerSet); putKV(ret, BOLT_TO_STATS, bolt2stats); putKV(ret, SPOUT_TO_STATS, spout2stats); @@ -543,23 +600,23 @@ public class StatsUtil { // (merge-with merge-agg-comp-stats-topo-page-bolt/spout (acc-stats comp-key) cid->statk->num) // (acc-stats comp-key) ==> bolt2stats/spout2stats if (isSpout) { - Set<String> keySet = new HashSet<>(); - keySet.addAll(spout2stats.keySet()); - keySet.addAll(cid2stats.keySet()); + Set<String> spouts = new HashSet<>(); + spouts.addAll(spout2stats.keySet()); + spouts.addAll(cid2stats.keySet()); - Map mm = new HashMap(); - for (String k : keySet) { - mm.put(k, mergeAggCompStatsTopoPageSpout((Map) spout2stats.get(k), (Map) cid2stats.get(k))); + Map<String, Object> mm = new HashMap<>(); + for (String spout : spouts) { + mm.put(spout, mergeAggCompStatsTopoPageSpout((Map) spout2stats.get(spout), (Map) cid2stats.get(spout))); } putKV(ret, SPOUT_TO_STATS, mm); } else { - Set<String> keySet = new HashSet<>(); - keySet.addAll(bolt2stats.keySet()); - keySet.addAll(cid2stats.keySet()); + Set<String> bolts = new HashSet<>(); + bolts.addAll(bolt2stats.keySet()); + bolts.addAll(cid2stats.keySet()); - Map mm = new HashMap(); - for (String k : keySet) { - mm.put(k, mergeAggCompStatsTopoPageBolt((Map) bolt2stats.get(k), (Map) cid2stats.get(k))); + Map<String, Object> mm = new HashMap<>(); + for (String bolt : bolts) { + mm.put(bolt, mergeAggCompStatsTopoPageBolt((Map) bolt2stats.get(bolt), (Map) cid2stats.get(bolt))); } putKV(ret, BOLT_TO_STATS, mm); } @@ -674,12 +731,13 @@ public class StatsUtil { * * @param statsSeq a seq of ExecutorStats * @param includeSys whether to include system streams - * @return aggregated bolt stats + * @return aggregated bolt stats: {metric -> win -> global stream id -> value} */ public static <T> Map<String, Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys) { Map<String, Map> ret = new HashMap<>(); Map<String, Map<String, Map<T, Long>>> commonStats = aggregateCommonStats(statsSeq); + // filter sys streams if necessary commonStats = preProcessStreamSummary(commonStats, includeSys); List<Map<String, Map<GlobalStreamId, Long>>> acked = new ArrayList<>(); @@ -710,13 +768,14 @@ public class StatsUtil { * * @param statsSeq a seq of ExecutorStats * @param includeSys whether to include system streams - * @return aggregated spout stats + * @return aggregated spout stats: {metric -> win -> global stream id -> value} */ public static Map<String, Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys) { // actually Map<String, Map<String, Map<String, Long/Double>>> Map<String, Map> ret = new HashMap<>(); Map<String, Map<String, Map<String, Long>>> commonStats = aggregateCommonStats(statsSeq); + // filter sys streams if necessary commonStats = preProcessStreamSummary(commonStats, includeSys); List<Map<String, Map<String, Long>>> acked = new ArrayList<>(); @@ -736,6 +795,9 @@ public class StatsUtil { return ret; } + /** + * aggregate common stats from a spout/bolt, called in aggregateSpoutStats/aggregateBoltStats + */ public static <T> Map<String, Map<String, Map<T, Long>>> aggregateCommonStats(List<ExecutorSummary> statsSeq) { Map<String, Map<String, Map<T, Long>>> ret = new HashMap<>(); @@ -751,6 +813,9 @@ public class StatsUtil { return ret; } + /** + * filter system streams of aggregated spout/bolt stats if necessary + */ public static <T> Map<String, Map<String, Map<T, Long>>> preProcessStreamSummary( Map<String, Map<String, Map<T, Long>>> streamSummary, boolean includeSys) { Map<String, Map<T, Long>> emitted = getMapByKey(streamSummary, EMITTED); @@ -762,6 +827,12 @@ public class StatsUtil { return streamSummary; } + /** + * aggregate count streams by window + * + * @param stats a Map of value: {win -> stream -> value} + * @return a Map of value: {win -> value} + */ public static <K, V extends Number> Map<String, Long> aggregateCountStreams( Map<String, Map<K, V>> stats) { Map<String, Long> ret = new HashMap<>(); @@ -776,6 +847,14 @@ public class StatsUtil { return ret; } + /** + * compute an weighted average from a list of average maps and a corresponding count maps + * extracted from a list of ExecutorSummary + * + * @param avgSeq a list of {win -> global stream id -> avg value} + * @param countSeq a list of {win -> global stream id -> count value} + * @return a Map of {win -> global stream id -> weighted avg value} + */ public static <K> Map<String, Map<K, Double>> aggregateAverages(List<Map<String, Map<K, Double>>> avgSeq, List<Map<String, Map<K, Long>>> countSeq) { Map<String, Map<K, Double>> ret = new HashMap<>(); @@ -796,8 +875,15 @@ public class StatsUtil { return ret; } - public static <K> Map<String, Double> aggregateAvgStreams( - Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>> counts) { + /** + * aggregate weighted average of all streams + * + * @param avgs a Map of {win -> stream -> average value} + * @param counts a Map of {win -> stream -> count value} + * @return a Map of {win -> aggregated value} + */ + public static <K> Map<String, Double> aggregateAvgStreams(Map<String, Map<K, Double>> avgs, + Map<String, Map<K, Long>> counts) { Map<String, Double> ret = new HashMap<>(); Map<String, Map<K, List>> expands = expandAverages(avgs, counts); @@ -818,14 +904,21 @@ public class StatsUtil { return ret; } + /** + * aggregates spout stream stats, returns a Map of {metric -> win -> aggregated value} + */ public static Map<String, Map> spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys) { if (summs == null) { return new HashMap<>(); } + // filter ExecutorSummary's with empty stats List<ExecutorSummary> statsSeq = getFilledStats(summs); return aggregateSpoutStreams(aggregateSpoutStats(statsSeq, includeSys)); } + /** + * aggregates bolt stream stats, returns a Map of {metric -> win -> aggregated value} + */ public static Map<String, Map> boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys) { if (summs == null) { return new HashMap<>(); @@ -834,6 +927,12 @@ public class StatsUtil { return aggregateBoltStreams(aggregateBoltStats(statsSeq, includeSys)); } + /** + * aggregate all spout streams + * + * @param stats a Map of {metric -> win -> stream id -> value} + * @return a Map of {metric -> win -> aggregated value} + */ public static Map<String, Map> aggregateSpoutStreams(Map<String, Map> stats) { // actual ret is Map<String, Map<String, Long/Double>> Map<String, Map> ret = new HashMap<>(); @@ -846,6 +945,12 @@ public class StatsUtil { return ret; } + /** + * aggregate all bolt streams + * + * @param stats a Map of {metric -> win -> stream id -> value} + * @return a Map of {metric -> win -> aggregated value} + */ public static Map<String, Map> aggregateBoltStreams(Map<String, Map> stats) { Map<String, Map> ret = new HashMap<>(); putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED))); @@ -861,7 +966,7 @@ public class StatsUtil { } /** - * A helper function that aggregates windowed stats from one spout executor. + * aggregate windowed stats from a bolt executor stats with a Map of accumulated stats */ public static Map<String, Object> aggBoltExecWinStats( Map<String, Object> accStats, Map<String, Object> newStats, boolean includeSys) { @@ -905,7 +1010,7 @@ public class StatsUtil { } /** - * A helper function that aggregates windowed stats from one spout executor. + * aggregate windowed stats from a spout executor stats with a Map of accumulated stats */ public static Map<String, Object> aggSpoutExecWinStats( Map<String, Object> accStats, Map<String, Object> beat, boolean includeSys) { @@ -944,7 +1049,7 @@ public class StatsUtil { /** - * aggregate counts + * aggregate a list of count maps into one map * * @param countsSeq a seq of {win -> GlobalStreamId -> value} */ @@ -973,8 +1078,8 @@ public class StatsUtil { return ret; } - public static Map<String, Object> aggregateCompStats(String window, boolean includeSys, - List<Map<String, Object>> beats, String compType) { + public static Map<String, Object> aggregateCompStats( + String window, boolean includeSys, List<Map<String, Object>> beats, String compType) { boolean isSpout = SPOUT.equals(compType); Map<String, Object> initVal = new HashMap<>(); @@ -998,6 +1103,7 @@ public class StatsUtil { } putKV(initVal, STATS, stats); + // iterate through all executor heartbeats for (Map<String, Object> beat : beats) { initVal = aggCompExecStats(window, includeSys, initVal, beat, compType); } @@ -1029,14 +1135,14 @@ public class StatsUtil { } /** - * post aggregate component stats + * post aggregate component stats: + * 1. computes execute-latency/process-latency from execute/process latency total + * 2. computes windowed weight avgs + * 3. transform Map keys * - * @param task2component task -> component, note it's a clojure map - * @param exec2hostPort executor -> host+port, note it's a clojure map - * @param compStats accumulated comp stats - * @return + * @param compStats accumulated comp stats */ - public static Map<String, Object> postAggregateCompStats(Map task2component, Map exec2hostPort, Map<String, Object> compStats) { + public static Map<String, Object> postAggregateCompStats(Map<String, Object> compStats) { Map<String, Object> ret = new HashMap<>(); String compType = (String) compStats.get(TYPE); @@ -1108,6 +1214,19 @@ public class StatsUtil { return ret; } + /** + * aggregate component executor stats + * + * @param exec2hostPort a Map of {executor -> host+port}, note it's a clojure map + * @param task2component a Map of {task id -> component}, note it's a clojure map + * @param beats a converted HashMap of executor heartbeats, {executor -> heartbeat} + * @param window specified window + * @param includeSys whether to include system streams + * @param topologyId topology id + * @param topology storm topology + * @param componentId component id + * @return ComponentPageInfo thrift structure + */ public static ComponentPageInfo aggCompExecsStats( Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) { @@ -1115,7 +1234,7 @@ public class StatsUtil { List<Map<String, Object>> beatList = extractDataFromHb(exec2hostPort, task2component, beats, includeSys, topology, componentId); Map<String, Object> compStats = aggregateCompStats(window, includeSys, beatList, componentType(topology, componentId)); - compStats = postAggregateCompStats(task2component, exec2hostPort, compStats); + compStats = postAggregateCompStats(compStats); return thriftifyCompPageData(topologyId, topology, componentId, compStats); } @@ -1124,6 +1243,9 @@ public class StatsUtil { // convert thrift stats to java maps // ===================================================================================== + /** + * convert thrift executor heartbeats into a java HashMap + */ public static Map<List<Integer>, Map<String, Object>> convertExecutorBeats(Map<ExecutorInfo, ExecutorBeat> beats) { Map<List<Integer>, Map<String, Object>> ret = new HashMap<>(); for (Map.Entry<ExecutorInfo, ExecutorBeat> beat : beats.entrySet()) { @@ -1150,6 +1272,12 @@ public class StatsUtil { return ret; } + /** + * convert a thrift worker heartbeat into a java HashMap + * + * @param workerHb + * @return + */ public static Map<String, Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb) { Map<String, Object> ret = new HashMap<>(); if (workerHb != null) { @@ -1224,6 +1352,15 @@ public class StatsUtil { return ret; } + /** + * extract a list of host port info for specified component + * + * @param exec2hostPort {executor -> host+port}, note it's a clojure map + * @param task2component {task id -> component}, note it's a clojure map + * @param includeSys whether to include system streams + * @param compId component id + * @return a list of host+port + */ public static List<Map<String, Object>> extractNodeInfosFromHbForComp( Map exec2hostPort, Map task2component, boolean includeSys, String compId) { List<Map<String, Object>> ret = new ArrayList<>(); @@ -1384,6 +1521,14 @@ public class StatsUtil { return ret; } + /** + * compute weighted avg from a Map of stats and given avg/count keys + * + * @param accData a Map of {win -> key -> value} + * @param wgtAvgKey weighted average key + * @param divisorKey count key + * @return a Map of {win -> weighted avg value} + */ private static Map<String, Double> computeWeightedAveragesPerWindow(Map<String, Object> accData, String wgtAvgKey, String divisorKey) { Map<String, Double> ret = new HashMap<>(); @@ -1400,6 +1545,9 @@ public class StatsUtil { } + /** + * convert a list of clojure executors to a java Set of List<Integer> + */ public static Set<List<Integer>> convertExecutors(Set executors) { Set<List<Integer>> convertedExecutors = new HashSet<>(); for (Object executor : executors) { @@ -1438,7 +1586,7 @@ public class StatsUtil { if (stats == null) { return 0.0; } else { - // Map<String, Map<String/GlobalStreamId, Long/Double>> {win -> stream -> value} + // actual value of m is: Map<String, Map<String/GlobalStreamId, Long/Double>> ({win -> stream -> value}) Map<String, Map> m = aggregateBoltStats(Lists.newArrayList(summary), true); // {metric -> win -> value} ==> {win -> metric -> value} m = swapMapOrder(aggregateBoltStreams(m)); @@ -1495,17 +1643,15 @@ public class StatsUtil { return sum; } - private static double sumStreamsDouble(Map m, String key) { + private static <K1, K2> double sumStreamsDouble(Map<K1, Map<K2, ?>> m, String key) { double sum = 0; if (m == null) { return sum; } - for (Object v : m.values()) { - Map sub = (Map) v; - for (Object o : sub.entrySet()) { - Map.Entry e = (Map.Entry) o; - if (e.getKey().equals(key)) { - sum += ((Number) e.getValue()).doubleValue(); + for (Map<K2, ?> v : m.values()) { + for (Map.Entry<K2, ?> entry : v.entrySet()) { + if (entry.getKey().equals(key)) { + sum += ((Number) entry.getValue()).doubleValue(); } } } @@ -1607,7 +1753,7 @@ public class StatsUtil { return ret; } - private static <K> Map mergeWithSumLong(Map<K, Long> m1, Map<K, Long> m2) { + private static <K> Map<K, Long> mergeWithSumLong(Map<K, Long> m1, Map<K, Long> m2) { Map<K, Long> ret = new HashMap<>(); Set<K> allKeys = new HashSet<>(); @@ -1626,7 +1772,7 @@ public class StatsUtil { return ret; } - private static <K> Map mergeWithSumDouble(Map<K, Double> m1, Map<K, Double> m2) { + private static <K> Map<K, Double> mergeWithSumDouble(Map<K, Double> m1, Map<K, Double> m2) { Map<K, Double> ret = new HashMap<>(); Set<K> allKeys = new HashSet<>(); @@ -2042,14 +2188,12 @@ public class StatsUtil { for (Map.Entry<T, V1> entry : id2Avg.entrySet()) { T k = entry.getKey(); - double v = entry.getValue().doubleValue(); - long n = id2num.get(k).longValue(); - ret += productOr0(v, n); + ret += productOr0(entry.getValue(), id2num.get(k)); } return ret; } - private static double weightAvg(Map id2Avg, Map id2num, Object key) { + private static <K, V1 extends Number, V2 extends Number> double weightAvg(Map<K, V1> id2Avg, Map<K, V2> id2num, K key) { if (id2Avg == null || id2num == null) { return 0.0; } @@ -2087,7 +2231,7 @@ public class StatsUtil { return (Map) map.get(key); } - private static <T, V extends Number> long sumValues(Map<T, V> m) { + private static <K, V extends Number> long sumValues(Map<K, V> m) { long ret = 0L; if (m == null) { return ret; @@ -2223,6 +2367,11 @@ public class StatsUtil { return stormClusterState.lastError(stormId, compId); } + + // ===================================================================================== + // key transformers + // ===================================================================================== + interface KeyTransformer<T> { T transform(Object key); }
