http://git-wip-us.apache.org/repos/asf/storm/blob/39ea23cd/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
index 0ed2af9,0000000..351e830
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
@@@ -1,2062 -1,0 +1,2065 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.stats;
 +
 +import clojure.lang.Keyword;
 +import clojure.lang.RT;
 +import com.google.common.collect.Lists;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import org.apache.storm.cluster.IStormClusterState;
 +import org.apache.storm.generated.Bolt;
 +import org.apache.storm.generated.BoltAggregateStats;
 +import org.apache.storm.generated.BoltStats;
 +import org.apache.storm.generated.CommonAggregateStats;
 +import org.apache.storm.generated.ComponentAggregateStats;
 +import org.apache.storm.generated.ComponentPageInfo;
 +import org.apache.storm.generated.ComponentType;
 +import org.apache.storm.generated.ErrorInfo;
 +import org.apache.storm.generated.ExecutorAggregateStats;
 +import org.apache.storm.generated.ExecutorInfo;
 +import org.apache.storm.generated.ExecutorSpecificStats;
 +import org.apache.storm.generated.ExecutorStats;
 +import org.apache.storm.generated.ExecutorSummary;
 +import org.apache.storm.generated.GlobalStreamId;
 +import org.apache.storm.generated.SpecificAggregateStats;
 +import org.apache.storm.generated.SpoutAggregateStats;
 +import org.apache.storm.generated.SpoutStats;
 +import org.apache.storm.generated.StormTopology;
 +import org.apache.storm.generated.TopologyPageInfo;
 +import org.apache.storm.generated.TopologyStats;
 +import org.apache.storm.utils.Utils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +@SuppressWarnings("unchecked, unused")
 +public class StatsUtil {
 +    private static final Logger logger = 
LoggerFactory.getLogger(StatsUtil.class);
 +
 +    public static final String TYPE = "type";
 +    private static final String SPOUT = "spout";
 +    private static final String BOLT = "bolt";
 +    public static final Keyword KW_SPOUT = keyword(SPOUT);
 +    public static final Keyword KW_BOLT = keyword(BOLT);
 +
 +    private static final String UPTIME = "uptime";
 +    private static final String HOST = "host";
 +    private static final String PORT = "port";
 +    private static final String NUM_TASKS = "num-tasks";
 +    private static final String NUM_EXECUTORS = "num-executors";
 +    private static final String NUM_WORKERS = "num-workers";
 +    private static final String CAPACITY = "capacity";
 +    private static final String STATS = "stats";
 +    private static final String EXECUTOR_STATS = "executor-stats";
 +    private static final String EXECUTOR_ID = "executor-id";
 +    private static final String LAST_ERROR = "lastError";
 +
 +    private static final String ACKED = "acked";
 +    private static final String FAILED = "failed";
 +    private static final String EXECUTED = "executed";
 +    private static final String EMITTED = "emitted";
 +    private static final String TRANSFERRED = "transferred";
 +
 +    private static final String EXEC_LATENCIES = "execute-latencies";
 +    private static final String PROC_LATENCIES = "process-latencies";
 +    private static final String COMP_LATENCIES = "complete-latencies";
 +
 +    private static final String EXEC_LATENCY = "execute-latency";
 +    private static final String PROC_LATENCY = "process-latency";
 +    private static final String COMP_LATENCY = "complete-latency";
 +
 +    private static final String EXEC_LAT_TOTAL = "executeLatencyTotal";
 +    private static final String PROC_LAT_TOTAL = "processLatencyTotal";
 +    private static final String COMP_LAT_TOTAL = "completeLatencyTotal";
 +
 +    private static final String WIN_TO_EMITTED = "window->emitted";
 +    private static final String WIN_TO_ACKED = "window->acked";
 +    private static final String WIN_TO_FAILED = "window->failed";
 +    private static final String WIN_TO_EXECUTED = "window->executed";
 +    private static final String WIN_TO_TRANSFERRED = "window->transferred";
 +    private static final String WIN_TO_EXEC_LAT = "window->execute-latency";
 +    private static final String WIN_TO_PROC_LAT = "window->process-latency";
 +    private static final String WIN_TO_COMP_LAT = "window->complete-latency";
 +    private static final String WIN_TO_COMP_LAT_WGT_AVG = 
"window->comp-lat-wgt-avg";
 +    private static final String WIN_TO_EXEC_LAT_WGT_AVG = 
"window->exec-lat-wgt-avg";
 +    private static final String WIN_TO_PROC_LAT_WGT_AVG = 
"window->proc-lat-wgt-avg";
 +
 +    private static final String BOLT_TO_STATS = "bolt-id->stats";
 +    private static final String SPOUT_TO_STATS = "spout-id->stats";
 +    private static final String SID_TO_OUT_STATS = "sid->output-stats";
 +    private static final String CID_SID_TO_IN_STATS = "cid+sid->input-stats";
 +    private static final String WORKERS_SET = "workers-set";
 +
 +    public static final int TEN_MIN_IN_SECONDS = 60 * 10;
 +    public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + 
"";
 +
 +    public static final IdentityTransformer IDENTITY = new 
IdentityTransformer();
 +    private static final ToStringTransformer TO_STRING = new 
ToStringTransformer();
 +    private static final FromGlobalStreamIdTransformer FROM_GSID = new 
FromGlobalStreamIdTransformer();
 +    public static final ToGlobalStreamIdTransformer TO_GSID = new 
ToGlobalStreamIdTransformer();
 +
 +
 +    // 
=====================================================================================
 +    // aggregation stats methods
 +    // 
=====================================================================================
 +
 +    /**
 +     * Aggregates number executed, process latency, and execute latency 
across all streams.
 +     *
 +     * @param id2execAvg { global stream id -> exec avg value }, e.g., 
{["split" "default"] 0.44313}
 +     * @param id2procAvg { global stream id -> proc avg value }
 +     * @param id2numExec { global stream id -> executed }
 +     */
 +    public static Map aggBoltLatAndCount(Map id2execAvg, Map id2procAvg, Map 
id2numExec) {
 +        Map ret = new HashMap();
-         putRawKV(ret, EXEC_LAT_TOTAL, weightAvgAndSum(id2execAvg, 
id2numExec));
-         putRawKV(ret, PROC_LAT_TOTAL, weightAvgAndSum(id2procAvg, 
id2numExec));
-         putRawKV(ret, EXECUTED, sumValues(id2numExec));
++        putKV(ret, EXEC_LAT_TOTAL, weightAvgAndSum(id2execAvg, id2numExec));
++        putKV(ret, PROC_LAT_TOTAL, weightAvgAndSum(id2procAvg, id2numExec));
++        putKV(ret, EXECUTED, sumValues(id2numExec));
 +
 +        return ret;
 +    }
 +
 +    /**
 +     * Aggregates number acked and complete latencies across all streams.
 +     */
 +    public static Map aggSpoutLatAndCount(Map id2compAvg, Map id2numAcked) {
 +        Map ret = new HashMap();
-         putRawKV(ret, COMP_LAT_TOTAL, weightAvgAndSum(id2compAvg, 
id2numAcked));
-         putRawKV(ret, ACKED, sumValues(id2numAcked));
++        putKV(ret, COMP_LAT_TOTAL, weightAvgAndSum(id2compAvg, id2numAcked));
++        putKV(ret, ACKED, sumValues(id2numAcked));
 +
 +        return ret;
 +    }
 +
 +    /**
 +     * Aggregates number executed and process & execute latencies.
 +     */
 +    public static Map aggBoltStreamsLatAndCount(Map id2execAvg, Map 
id2procAvg, Map id2numExec) {
 +        Map ret = new HashMap();
 +        if (id2execAvg == null || id2procAvg == null || id2numExec == null) {
 +            return ret;
 +        }
 +        for (Object k : id2execAvg.keySet()) {
 +            Map subMap = new HashMap();
-             putRawKV(subMap, EXEC_LAT_TOTAL, weightAvg(id2execAvg, 
id2numExec, k));
-             putRawKV(subMap, PROC_LAT_TOTAL, weightAvg(id2procAvg, 
id2numExec, k));
-             putRawKV(subMap, EXECUTED, id2numExec.get(k));
++            putKV(subMap, EXEC_LAT_TOTAL, weightAvg(id2execAvg, id2numExec, 
k));
++            putKV(subMap, PROC_LAT_TOTAL, weightAvg(id2procAvg, id2numExec, 
k));
++            putKV(subMap, EXECUTED, id2numExec.get(k));
 +            ret.put(k, subMap);
 +        }
 +        return ret;
 +    }
 +
 +    /**
 +     * Aggregates number acked and complete latencies.
 +     */
 +    public static Map aggSpoutStreamsLatAndCount(Map id2compAvg, Map 
id2acked) {
 +        Map ret = new HashMap();
 +        if (id2compAvg == null || id2acked == null) {
 +            return ret;
 +        }
 +        for (Object k : id2compAvg.keySet()) {
 +            Map subMap = new HashMap();
-             putRawKV(subMap, COMP_LAT_TOTAL, weightAvg(id2compAvg, id2acked, 
k));
-             putRawKV(subMap, ACKED, id2acked.get(k));
++            putKV(subMap, COMP_LAT_TOTAL, weightAvg(id2compAvg, id2acked, k));
++            putKV(subMap, ACKED, id2acked.get(k));
 +            ret.put(k, subMap);
 +        }
 +        return ret;
 +    }
 +
 +    public static Map aggPreMergeCompPageBolt(Map m, String window, boolean 
includeSys) {
 +        Map ret = new HashMap();
-         putRawKV(ret, EXECUTOR_ID, getByKeyword(m, "exec-id"));
-         putRawKV(ret, HOST, getByKeyword(m, HOST));
-         putRawKV(ret, PORT, getByKeyword(m, PORT));
-         putRawKV(ret, UPTIME, getByKeyword(m, UPTIME));
-         putRawKV(ret, NUM_EXECUTORS, 1);
-         putRawKV(ret, NUM_TASKS, getByKeyword(m, NUM_TASKS));
++        putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id"));
++        putKV(ret, HOST, getByKey(m, HOST));
++        putKV(ret, PORT, getByKey(m, PORT));
++        putKV(ret, UPTIME, getByKey(m, UPTIME));
++        putKV(ret, NUM_EXECUTORS, 1);
++        putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS));
 +
-         Map stat2win2sid2num = getMapByKeyword(m, STATS);
-         putRawKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeywordOr0(m, UPTIME).intValue()));
++        Map stat2win2sid2num = getMapByKey(m, STATS);
++        putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeywordOr0(m, UPTIME).intValue()));
 +
 +        // calc cid+sid->input_stats
 +        Map inputStats = new HashMap();
-         Map sid2acked = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, ACKED), 
TO_STRING).get(window);
-         Map sid2failed = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, FAILED), 
TO_STRING).get(window);
-         putRawKV(inputStats, ACKED, sid2acked != null ? sid2acked : new 
HashMap());
-         putRawKV(inputStats, FAILED, sid2failed != null ? sid2failed : new 
HashMap());
++        Map sid2acked = (Map) 
windowSetConverter(getMapByKey(stat2win2sid2num, ACKED), TO_STRING).get(window);
++        Map sid2failed = (Map) 
windowSetConverter(getMapByKey(stat2win2sid2num, FAILED), 
TO_STRING).get(window);
++        putKV(inputStats, ACKED, sid2acked != null ? sid2acked : new 
HashMap());
++        putKV(inputStats, FAILED, sid2failed != null ? sid2failed : new 
HashMap());
 +
 +        inputStats = swapMapOrder(inputStats);
 +
-         Map sid2execLat = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, EXEC_LATENCIES), 
TO_STRING).get(window);
-         Map sid2procLat = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, PROC_LATENCIES), 
TO_STRING).get(window);
-         Map sid2exec = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, EXECUTED), 
TO_STRING).get(window);
++        Map sid2execLat = (Map) 
windowSetConverter(getMapByKey(stat2win2sid2num, EXEC_LATENCIES), 
TO_STRING).get(window);
++        Map sid2procLat = (Map) 
windowSetConverter(getMapByKey(stat2win2sid2num, PROC_LATENCIES), 
TO_STRING).get(window);
++        Map sid2exec = (Map) windowSetConverter(getMapByKey(stat2win2sid2num, 
EXECUTED), TO_STRING).get(window);
 +        mergeMaps(inputStats, aggBoltStreamsLatAndCount(sid2execLat, 
sid2procLat, sid2exec));
-         putRawKV(ret, CID_SID_TO_IN_STATS, inputStats);
++        putKV(ret, CID_SID_TO_IN_STATS, inputStats);
 +
 +        // calc sid->output_stats
 +        Map outputStats = new HashMap();
-         Map sid2emitted = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, EMITTED), 
TO_STRING).get(window);
-         Map sid2transferred = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, TRANSFERRED), 
TO_STRING).get(window);
++        Map sid2emitted = (Map) 
windowSetConverter(getMapByKey(stat2win2sid2num, EMITTED), 
TO_STRING).get(window);
++        Map sid2transferred = (Map) 
windowSetConverter(getMapByKey(stat2win2sid2num, TRANSFERRED), 
TO_STRING).get(window);
 +        if (sid2emitted != null) {
-             putRawKV(outputStats, EMITTED, filterSysStreams(sid2emitted, 
includeSys));
++            putKV(outputStats, EMITTED, filterSysStreams(sid2emitted, 
includeSys));
 +        } else {
-             putRawKV(outputStats, EMITTED, new HashMap());
++            putKV(outputStats, EMITTED, new HashMap());
 +        }
 +        if (sid2transferred != null) {
-             putRawKV(outputStats, TRANSFERRED, 
filterSysStreams(sid2transferred, includeSys));
++            putKV(outputStats, TRANSFERRED, filterSysStreams(sid2transferred, 
includeSys));
 +        } else {
-             putRawKV(outputStats, TRANSFERRED, new HashMap());
++            putKV(outputStats, TRANSFERRED, new HashMap());
 +        }
 +        outputStats = swapMapOrder(outputStats);
-         putRawKV(ret, SID_TO_OUT_STATS, outputStats);
++        putKV(ret, SID_TO_OUT_STATS, outputStats);
 +
 +        return ret;
 +    }
 +
 +    public static Map aggPreMergeCompPageSpout(Map m, String window, boolean 
includeSys) {
 +        Map ret = new HashMap();
-         putRawKV(ret, EXECUTOR_ID, getByKeyword(m, "exec-id"));
-         putRawKV(ret, HOST, getByKeyword(m, HOST));
-         putRawKV(ret, PORT, getByKeyword(m, PORT));
-         putRawKV(ret, UPTIME, getByKeyword(m, UPTIME));
-         putRawKV(ret, NUM_EXECUTORS, 1);
-         putRawKV(ret, NUM_TASKS, getByKeyword(m, NUM_TASKS));
++        putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id"));
++        putKV(ret, HOST, getByKey(m, HOST));
++        putKV(ret, PORT, getByKey(m, PORT));
++        putKV(ret, UPTIME, getByKey(m, UPTIME));
++        putKV(ret, NUM_EXECUTORS, 1);
++        putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS));
 +
-         Map stat2win2sid2num = getMapByKeyword(m, STATS);
++        Map stat2win2sid2num = getMapByKey(m, STATS);
 +
 +        // calc sid->output-stats
 +        Map outputStats = new HashMap();
-         Map win2sid2acked = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, ACKED), TO_STRING);
-         Map win2sid2failed = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, FAILED), TO_STRING);
-         Map win2sid2emitted = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, EMITTED), TO_STRING);
-         Map win2sid2transferred = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, TRANSFERRED), TO_STRING);
-         Map win2sid2compLat = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, COMP_LATENCIES), 
TO_STRING);
- 
-         putRawKV(outputStats, ACKED, win2sid2acked.get(window));
-         putRawKV(outputStats, FAILED, win2sid2failed.get(window));
-         putRawKV(outputStats, EMITTED, filterSysStreams((Map) 
win2sid2emitted.get(window), includeSys));
-         putRawKV(outputStats, TRANSFERRED, filterSysStreams((Map) 
win2sid2transferred.get(window), includeSys));
++        Map win2sid2acked = windowSetConverter(getMapByKey(stat2win2sid2num, 
ACKED), TO_STRING);
++        Map win2sid2failed = windowSetConverter(getMapByKey(stat2win2sid2num, 
FAILED), TO_STRING);
++        Map win2sid2emitted = 
windowSetConverter(getMapByKey(stat2win2sid2num, EMITTED), TO_STRING);
++        Map win2sid2transferred = 
windowSetConverter(getMapByKey(stat2win2sid2num, TRANSFERRED), TO_STRING);
++        Map win2sid2compLat = 
windowSetConverter(getMapByKey(stat2win2sid2num, COMP_LATENCIES), TO_STRING);
++
++        putKV(outputStats, ACKED, win2sid2acked.get(window));
++        putKV(outputStats, FAILED, win2sid2failed.get(window));
++        putKV(outputStats, EMITTED, filterSysStreams((Map) 
win2sid2emitted.get(window), includeSys));
++        putKV(outputStats, TRANSFERRED, filterSysStreams((Map) 
win2sid2transferred.get(window), includeSys));
 +        outputStats = swapMapOrder(outputStats);
 +
 +        Map sid2compLat = (Map) win2sid2compLat.get(window);
 +        Map sid2acked = (Map) win2sid2acked.get(window);
 +        mergeMaps(outputStats, aggSpoutStreamsLatAndCount(sid2compLat, 
sid2acked));
-         putRawKV(ret, SID_TO_OUT_STATS, outputStats);
++        putKV(ret, SID_TO_OUT_STATS, outputStats);
 +
 +        return ret;
 +    }
 +
 +    public static Map aggPreMergeTopoPageBolt(Map m, String window, boolean 
includeSys) {
 +        Map ret = new HashMap();
 +
 +        Map subRet = new HashMap();
-         putRawKV(subRet, NUM_EXECUTORS, 1);
-         putRawKV(subRet, NUM_TASKS, getByKeyword(m, NUM_TASKS));
++        putKV(subRet, NUM_EXECUTORS, 1);
++        putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS));
 +
-         Map stat2win2sid2num = getMapByKeyword(m, STATS);
-         putRawKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeywordOr0(m, UPTIME).intValue()));
++        Map stat2win2sid2num = getMapByKey(m, STATS);
++        putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeywordOr0(m, UPTIME).intValue()));
 +
 +        for (String key : new String[]{EMITTED, TRANSFERRED, ACKED, FAILED}) {
-             Map stat = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, key), 
TO_STRING).get(window);
++            Map stat = (Map) windowSetConverter(getMapByKey(stat2win2sid2num, 
key), TO_STRING).get(window);
 +            if (EMITTED.equals(key) || TRANSFERRED.equals(key)) {
 +                stat = filterSysStreams(stat, includeSys);
 +            }
 +            long sum = 0;
 +            if (stat != null) {
 +                for (Object o : stat.values()) {
 +                    sum += ((Number) o).longValue();
 +                }
 +            }
-             putRawKV(subRet, key, sum);
++            putKV(subRet, key, sum);
 +        }
 +
-         Map win2sid2execLat = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, EXEC_LATENCIES), 
TO_STRING);
-         Map win2sid2procLat = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, PROC_LATENCIES), 
TO_STRING);
-         Map win2sid2exec = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, EXECUTED), TO_STRING);
++        Map win2sid2execLat = 
windowSetConverter(getMapByKey(stat2win2sid2num, EXEC_LATENCIES), TO_STRING);
++        Map win2sid2procLat = 
windowSetConverter(getMapByKey(stat2win2sid2num, PROC_LATENCIES), TO_STRING);
++        Map win2sid2exec = windowSetConverter(getMapByKey(stat2win2sid2num, 
EXECUTED), TO_STRING);
 +        subRet.putAll(aggBoltLatAndCount(
 +                (Map) win2sid2execLat.get(window), (Map) 
win2sid2procLat.get(window), (Map) win2sid2exec.get(window)));
 +
-         ret.put(getByKeyword(m, "comp-id"), subRet);
++        ret.put(getByKey(m, "comp-id"), subRet);
 +        return ret;
 +    }
 +
 +    public static Map aggPreMergeTopoPageSpout(Map m, String window, boolean 
includeSys) {
 +        Map ret = new HashMap();
 +
 +        Map subRet = new HashMap();
-         putRawKV(subRet, NUM_EXECUTORS, 1);
-         putRawKV(subRet, NUM_TASKS, getByKeyword(m, NUM_TASKS));
++        putKV(subRet, NUM_EXECUTORS, 1);
++        putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS));
 +
 +        // no capacity for spout
-         Map stat2win2sid2num = getMapByKeyword(m, STATS);
++        Map stat2win2sid2num = getMapByKey(m, STATS);
 +        for (String key : new String[]{EMITTED, TRANSFERRED, FAILED}) {
-             Map stat = (Map) 
windowSetConverter(getMapByKeyword(stat2win2sid2num, key), 
TO_STRING).get(window);
++            Map stat = (Map) windowSetConverter(getMapByKey(stat2win2sid2num, 
key), TO_STRING).get(window);
 +            if (EMITTED.equals(key) || TRANSFERRED.equals(key)) {
 +                stat = filterSysStreams(stat, includeSys);
 +            }
 +            long sum = 0;
 +            if (stat != null) {
 +                for (Object o : stat.values()) {
 +                    sum += ((Number) o).longValue();
 +                }
 +            }
-             putRawKV(subRet, key, sum);
++            putKV(subRet, key, sum);
 +        }
 +
-         Map win2sid2compLat = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, COMP_LATENCIES), 
TO_STRING);
-         Map win2sid2acked = 
windowSetConverter(getMapByKeyword(stat2win2sid2num, ACKED), TO_STRING);
++        Map win2sid2compLat = 
windowSetConverter(getMapByKey(stat2win2sid2num, COMP_LATENCIES), TO_STRING);
++        Map win2sid2acked = windowSetConverter(getMapByKey(stat2win2sid2num, 
ACKED), TO_STRING);
 +        subRet.putAll(aggSpoutLatAndCount((Map) win2sid2compLat.get(window), 
(Map) win2sid2acked.get(window)));
 +
-         ret.put(getByKeyword(m, "comp-id"), subRet);
++        ret.put(getByKey(m, "comp-id"), subRet);
 +        return ret;
 +    }
 +
 +    public static Map mergeAggCompStatsCompPageBolt(Map accBoltStats, Map 
boltStats) {
 +        Map ret = new HashMap();
 +
-         Map accIn = getMapByKeyword(accBoltStats, CID_SID_TO_IN_STATS);
-         Map accOut = getMapByKeyword(accBoltStats, SID_TO_OUT_STATS);
-         Map boltIn = getMapByKeyword(boltStats, CID_SID_TO_IN_STATS);
-         Map boltOut = getMapByKeyword(boltStats, SID_TO_OUT_STATS);
++        Map accIn = getMapByKey(accBoltStats, CID_SID_TO_IN_STATS);
++        Map accOut = getMapByKey(accBoltStats, SID_TO_OUT_STATS);
++        Map boltIn = getMapByKey(boltStats, CID_SID_TO_IN_STATS);
++        Map boltOut = getMapByKey(boltStats, SID_TO_OUT_STATS);
 +
 +        int numExecutors = getByKeywordOr0(accBoltStats, 
NUM_EXECUTORS).intValue();
-         putRawKV(ret, NUM_EXECUTORS, numExecutors + 1);
-         putRawKV(ret, NUM_TASKS, sumOr0(
++        putKV(ret, NUM_EXECUTORS, numExecutors + 1);
++        putKV(ret, NUM_TASKS, sumOr0(
 +                getByKeywordOr0(accBoltStats, NUM_TASKS), 
getByKeywordOr0(boltStats, NUM_TASKS)));
 +
 +        // (merge-with (partial merge-with sum-or-0) acc-out spout-out)
-         putRawKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, boltOut));
-         putRawKV(ret, CID_SID_TO_IN_STATS, fullMergeWithSum(accIn, boltIn));
++        putKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, boltOut));
++        putKV(ret, CID_SID_TO_IN_STATS, fullMergeWithSum(accIn, boltIn));
 +
 +        long executed = sumStreamsLong(boltIn, EXECUTED);
-         putRawKV(ret, EXECUTED, executed);
++        putKV(ret, EXECUTED, executed);
 +
 +        Map executorStats = new HashMap();
-         putRawKV(executorStats, EXECUTOR_ID, getByKeyword(boltStats, 
EXECUTOR_ID));
-         putRawKV(executorStats, UPTIME, getByKeyword(boltStats, UPTIME));
-         putRawKV(executorStats, HOST, getByKeyword(boltStats, HOST));
-         putRawKV(executorStats, PORT, getByKeyword(boltStats, PORT));
-         putRawKV(executorStats, CAPACITY, getByKeyword(boltStats, CAPACITY));
- 
-         putRawKV(executorStats, EMITTED, sumStreamsLong(boltOut, EMITTED));
-         putRawKV(executorStats, TRANSFERRED, sumStreamsLong(boltOut, 
TRANSFERRED));
-         putRawKV(executorStats, ACKED, sumStreamsLong(boltIn, ACKED));
-         putRawKV(executorStats, FAILED, sumStreamsLong(boltIn, FAILED));
-         putRawKV(executorStats, EXECUTED, executed);
++        putKV(executorStats, EXECUTOR_ID, getByKey(boltStats, EXECUTOR_ID));
++        putKV(executorStats, UPTIME, getByKey(boltStats, UPTIME));
++        putKV(executorStats, HOST, getByKey(boltStats, HOST));
++        putKV(executorStats, PORT, getByKey(boltStats, PORT));
++        putKV(executorStats, CAPACITY, getByKey(boltStats, CAPACITY));
++
++        putKV(executorStats, EMITTED, sumStreamsLong(boltOut, EMITTED));
++        putKV(executorStats, TRANSFERRED, sumStreamsLong(boltOut, 
TRANSFERRED));
++        putKV(executorStats, ACKED, sumStreamsLong(boltIn, ACKED));
++        putKV(executorStats, FAILED, sumStreamsLong(boltIn, FAILED));
++        putKV(executorStats, EXECUTED, executed);
 +
 +        if (executed > 0) {
-             putRawKV(executorStats, EXEC_LATENCY, sumStreamsDouble(boltIn, 
EXEC_LAT_TOTAL) / executed);
-             putRawKV(executorStats, PROC_LATENCY, sumStreamsDouble(boltIn, 
PROC_LAT_TOTAL) / executed);
++            putKV(executorStats, EXEC_LATENCY, sumStreamsDouble(boltIn, 
EXEC_LAT_TOTAL) / executed);
++            putKV(executorStats, PROC_LATENCY, sumStreamsDouble(boltIn, 
PROC_LAT_TOTAL) / executed);
 +        } else {
-             putRawKV(executorStats, EXEC_LATENCY, null);
-             putRawKV(executorStats, PROC_LATENCY, null);
++            putKV(executorStats, EXEC_LATENCY, null);
++            putKV(executorStats, PROC_LATENCY, null);
 +        }
-         List executorStatsList = ((List) getByKeyword(accBoltStats, 
EXECUTOR_STATS));
++        List executorStatsList = ((List) getByKey(accBoltStats, 
EXECUTOR_STATS));
 +        executorStatsList.add(executorStats);
-         putRawKV(ret, EXECUTOR_STATS, executorStatsList);
++        putKV(ret, EXECUTOR_STATS, executorStatsList);
 +
 +        return ret;
 +    }
 +
 +    public static Map mergeAggCompStatsCompPageSpout(Map accSpoutStats, Map 
spoutStats) {
 +        Map ret = new HashMap();
 +
-         Map accOut = getMapByKeyword(accSpoutStats, SID_TO_OUT_STATS);
-         Map spoutOut = getMapByKeyword(spoutStats, SID_TO_OUT_STATS);
++        Map accOut = getMapByKey(accSpoutStats, SID_TO_OUT_STATS);
++        Map spoutOut = getMapByKey(spoutStats, SID_TO_OUT_STATS);
 +
 +        int numExecutors = getByKeywordOr0(accSpoutStats, 
NUM_EXECUTORS).intValue();
-         putRawKV(ret, NUM_EXECUTORS, numExecutors + 1);
-         putRawKV(ret, NUM_TASKS, sumOr0(
++        putKV(ret, NUM_EXECUTORS, numExecutors + 1);
++        putKV(ret, NUM_TASKS, sumOr0(
 +                getByKeywordOr0(accSpoutStats, NUM_TASKS), 
getByKeywordOr0(spoutStats, NUM_TASKS)));
-         putRawKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, spoutOut));
++        putKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, spoutOut));
 +
 +        Map executorStats = new HashMap();
-         putRawKV(executorStats, EXECUTOR_ID, getByKeyword(spoutStats, 
EXECUTOR_ID));
-         putRawKV(executorStats, UPTIME, getByKeyword(spoutStats, UPTIME));
-         putRawKV(executorStats, HOST, getByKeyword(spoutStats, HOST));
-         putRawKV(executorStats, PORT, getByKeyword(spoutStats, PORT));
- 
-         putRawKV(executorStats, EMITTED, sumStreamsLong(spoutOut, EMITTED));
-         putRawKV(executorStats, TRANSFERRED, sumStreamsLong(spoutOut, 
TRANSFERRED));
-         putRawKV(executorStats, FAILED, sumStreamsLong(spoutOut, FAILED));
++        putKV(executorStats, EXECUTOR_ID, getByKey(spoutStats, EXECUTOR_ID));
++        putKV(executorStats, UPTIME, getByKey(spoutStats, UPTIME));
++        putKV(executorStats, HOST, getByKey(spoutStats, HOST));
++        putKV(executorStats, PORT, getByKey(spoutStats, PORT));
++
++        putKV(executorStats, EMITTED, sumStreamsLong(spoutOut, EMITTED));
++        putKV(executorStats, TRANSFERRED, sumStreamsLong(spoutOut, 
TRANSFERRED));
++        putKV(executorStats, FAILED, sumStreamsLong(spoutOut, FAILED));
 +        long acked = sumStreamsLong(spoutOut, ACKED);
-         putRawKV(executorStats, ACKED, acked);
++        putKV(executorStats, ACKED, acked);
 +        if (acked > 0) {
-             putRawKV(executorStats, COMP_LATENCY, sumStreamsDouble(spoutOut, 
COMP_LAT_TOTAL) / acked);
++            putKV(executorStats, COMP_LATENCY, sumStreamsDouble(spoutOut, 
COMP_LAT_TOTAL) / acked);
 +        } else {
-             putRawKV(executorStats, COMP_LATENCY, null);
++            putKV(executorStats, COMP_LATENCY, null);
 +        }
-         List executorStatsList = ((List) getByKeyword(accSpoutStats, 
EXECUTOR_STATS));
++        List executorStatsList = ((List) getByKey(accSpoutStats, 
EXECUTOR_STATS));
 +        executorStatsList.add(executorStats);
-         putRawKV(ret, EXECUTOR_STATS, executorStatsList);
++        putKV(ret, EXECUTOR_STATS, executorStatsList);
 +
 +        return ret;
 +    }
 +
 +    public static Map mergeAggCompStatsTopoPageBolt(Map accBoltStats, Map 
boltStats) {
 +        Map ret = new HashMap();
 +        Integer numExecutors = getByKeywordOr0(accBoltStats, 
NUM_EXECUTORS).intValue();
-         putRawKV(ret, NUM_EXECUTORS, numExecutors + 1);
-         putRawKV(ret, NUM_TASKS, sumOr0(
++        putKV(ret, NUM_EXECUTORS, numExecutors + 1);
++        putKV(ret, NUM_TASKS, sumOr0(
 +                getByKeywordOr0(accBoltStats, NUM_TASKS), 
getByKeywordOr0(boltStats, NUM_TASKS)));
-         putRawKV(ret, EMITTED, sumOr0(
++        putKV(ret, EMITTED, sumOr0(
 +                getByKeywordOr0(accBoltStats, EMITTED), 
getByKeywordOr0(boltStats, EMITTED)));
-         putRawKV(ret, TRANSFERRED, sumOr0(
++        putKV(ret, TRANSFERRED, sumOr0(
 +                getByKeywordOr0(accBoltStats, TRANSFERRED), 
getByKeywordOr0(boltStats, TRANSFERRED)));
-         putRawKV(ret, EXEC_LAT_TOTAL, sumOr0(
++        putKV(ret, EXEC_LAT_TOTAL, sumOr0(
 +                getByKeywordOr0(accBoltStats, EXEC_LAT_TOTAL), 
getByKeywordOr0(boltStats, EXEC_LAT_TOTAL)));
-         putRawKV(ret, PROC_LAT_TOTAL, sumOr0(
++        putKV(ret, PROC_LAT_TOTAL, sumOr0(
 +                getByKeywordOr0(accBoltStats, PROC_LAT_TOTAL), 
getByKeywordOr0(boltStats, PROC_LAT_TOTAL)));
-         putRawKV(ret, EXECUTED, sumOr0(
++        putKV(ret, EXECUTED, sumOr0(
 +                getByKeywordOr0(accBoltStats, EXECUTED), 
getByKeywordOr0(boltStats, EXECUTED)));
-         putRawKV(ret, ACKED, sumOr0(
++        putKV(ret, ACKED, sumOr0(
 +                getByKeywordOr0(accBoltStats, ACKED), 
getByKeywordOr0(boltStats, ACKED)));
-         putRawKV(ret, FAILED, sumOr0(
++        putKV(ret, FAILED, sumOr0(
 +                getByKeywordOr0(accBoltStats, FAILED), 
getByKeywordOr0(boltStats, FAILED)));
-         putRawKV(ret, CAPACITY, maxOr0(
++        putKV(ret, CAPACITY, maxOr0(
 +                getByKeywordOr0(accBoltStats, CAPACITY), 
getByKeywordOr0(boltStats, CAPACITY)));
 +
 +        return ret;
 +    }
 +
 +    public static Map mergeAggCompStatsTopoPageSpout(Map accSpoutStats, Map 
spoutStats) {
 +        Map ret = new HashMap();
 +        Integer numExecutors = getByKeywordOr0(accSpoutStats, 
NUM_EXECUTORS).intValue();
-         putRawKV(ret, NUM_EXECUTORS, numExecutors + 1);
-         putRawKV(ret, NUM_TASKS, sumOr0(
++        putKV(ret, NUM_EXECUTORS, numExecutors + 1);
++        putKV(ret, NUM_TASKS, sumOr0(
 +                getByKeywordOr0(accSpoutStats, NUM_TASKS), 
getByKeywordOr0(spoutStats, NUM_TASKS)));
-         putRawKV(ret, EMITTED, sumOr0(
++        putKV(ret, EMITTED, sumOr0(
 +                getByKeywordOr0(accSpoutStats, EMITTED), 
getByKeywordOr0(spoutStats, EMITTED)));
-         putRawKV(ret, TRANSFERRED, sumOr0(
++        putKV(ret, TRANSFERRED, sumOr0(
 +                getByKeywordOr0(accSpoutStats, TRANSFERRED), 
getByKeywordOr0(spoutStats, TRANSFERRED)));
-         putRawKV(ret, COMP_LAT_TOTAL, sumOr0(
++        putKV(ret, COMP_LAT_TOTAL, sumOr0(
 +                getByKeywordOr0(accSpoutStats, COMP_LAT_TOTAL), 
getByKeywordOr0(spoutStats, COMP_LAT_TOTAL)));
-         putRawKV(ret, ACKED, sumOr0(
++        putKV(ret, ACKED, sumOr0(
 +                getByKeywordOr0(accSpoutStats, ACKED), 
getByKeywordOr0(spoutStats, ACKED)));
-         putRawKV(ret, FAILED, sumOr0(
++        putKV(ret, FAILED, sumOr0(
 +                getByKeywordOr0(accSpoutStats, FAILED), 
getByKeywordOr0(spoutStats, FAILED)));
 +
 +        return ret;
 +    }
 +
 +    /**
 +     * A helper function that does the common work to aggregate stats of one
 +     * executor with the given map for the topology page.
 +     */
 +    public static Map aggTopoExecStats(String window, boolean includeSys, Map 
accStats, Map newData, String compType) {
 +        Map ret = new HashMap();
 +
-         Set workerSet = (Set) getByKeyword(accStats, WORKERS_SET);
-         Map bolt2stats = getMapByKeyword(accStats, BOLT_TO_STATS);
-         Map spout2stats = getMapByKeyword(accStats, SPOUT_TO_STATS);
-         Map win2emitted = getMapByKeyword(accStats, WIN_TO_EMITTED);
-         Map win2transferred = getMapByKeyword(accStats, WIN_TO_TRANSFERRED);
-         Map win2compLatWgtAvg = getMapByKeyword(accStats, 
WIN_TO_COMP_LAT_WGT_AVG);
-         Map win2acked = getMapByKeyword(accStats, WIN_TO_ACKED);
-         Map win2failed = getMapByKeyword(accStats, WIN_TO_FAILED);
-         Map stats = getMapByKeyword(newData, STATS);
++        Set workerSet = (Set) getByKey(accStats, WORKERS_SET);
++        Map bolt2stats = getMapByKey(accStats, BOLT_TO_STATS);
++        Map spout2stats = getMapByKey(accStats, SPOUT_TO_STATS);
++        Map win2emitted = getMapByKey(accStats, WIN_TO_EMITTED);
++        Map win2transferred = getMapByKey(accStats, WIN_TO_TRANSFERRED);
++        Map win2compLatWgtAvg = getMapByKey(accStats, 
WIN_TO_COMP_LAT_WGT_AVG);
++        Map win2acked = getMapByKey(accStats, WIN_TO_ACKED);
++        Map win2failed = getMapByKey(accStats, WIN_TO_FAILED);
++        Map stats = getMapByKey(newData, STATS);
 +
 +        boolean isSpout = compType.equals(SPOUT);
 +        Map cid2stat2num;
 +        if (isSpout) {
 +            cid2stat2num = aggPreMergeTopoPageSpout(newData, window, 
includeSys);
 +        } else {
 +            cid2stat2num = aggPreMergeTopoPageBolt(newData, window, 
includeSys);
 +        }
 +
 +        Map w2compLatWgtAvg, w2acked;
-         Map compLatStats = getMapByKeyword(stats, COMP_LATENCIES);
++        Map compLatStats = getMapByKey(stats, COMP_LATENCIES);
 +        if (isSpout) { // agg spout stats
 +            Map mm = new HashMap();
 +
-             Map acked = getMapByKeyword(stats, ACKED);
++            Map acked = getMapByKey(stats, ACKED);
 +            for (Object win : acked.keySet()) {
 +                mm.put(win, aggSpoutLatAndCount((Map) compLatStats.get(win), 
(Map) acked.get(win)));
 +            }
 +            mm = swapMapOrder(mm);
-             w2compLatWgtAvg = getMapByKeyword(mm, COMP_LAT_TOTAL);
-             w2acked = getMapByKeyword(mm, ACKED);
++            w2compLatWgtAvg = getMapByKey(mm, COMP_LAT_TOTAL);
++            w2acked = getMapByKey(mm, ACKED);
 +        } else {
 +            w2compLatWgtAvg = null;
-             w2acked = aggregateCountStreams(getMapByKeyword(stats, ACKED));
++            w2acked = aggregateCountStreams(getMapByKey(stats, ACKED));
 +        }
 +
-         workerSet.add(Lists.newArrayList(getByKeyword(newData, HOST), 
getByKeyword(newData, PORT)));
-         putRawKV(ret, WORKERS_SET, workerSet);
-         putRawKV(ret, BOLT_TO_STATS, bolt2stats);
-         putRawKV(ret, SPOUT_TO_STATS, spout2stats);
-         putRawKV(ret, WIN_TO_EMITTED, mergeWithSum(win2emitted, 
aggregateCountStreams(
-                 filterSysStreams(getMapByKeyword(stats, EMITTED), 
includeSys))));
-         putRawKV(ret, WIN_TO_TRANSFERRED, mergeWithSum(win2transferred, 
aggregateCountStreams(
-                 filterSysStreams(getMapByKeyword(stats, TRANSFERRED), 
includeSys))));
-         putRawKV(ret, WIN_TO_COMP_LAT_WGT_AVG, 
mergeWithSum(win2compLatWgtAvg, w2compLatWgtAvg));
++        workerSet.add(Lists.newArrayList(getByKey(newData, HOST), 
getByKey(newData, PORT)));
++        putKV(ret, WORKERS_SET, workerSet);
++        putKV(ret, BOLT_TO_STATS, bolt2stats);
++        putKV(ret, SPOUT_TO_STATS, spout2stats);
++        putKV(ret, WIN_TO_EMITTED, mergeWithSum(win2emitted, 
aggregateCountStreams(
++                filterSysStreams(getMapByKey(stats, EMITTED), includeSys))));
++        putKV(ret, WIN_TO_TRANSFERRED, mergeWithSum(win2transferred, 
aggregateCountStreams(
++                filterSysStreams(getMapByKey(stats, TRANSFERRED), 
includeSys))));
++        putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum(win2compLatWgtAvg, 
w2compLatWgtAvg));
 +
-         //boolean isSpoutStat = SPOUT.equals(((Keyword) getByKeyword(stats, 
TYPE)).getName());
-         putRawKV(ret, WIN_TO_ACKED, isSpout ? mergeWithSum(win2acked, 
w2acked) : win2acked);
-         putRawKV(ret, WIN_TO_FAILED, isSpout ?
-                 mergeWithSum(aggregateCountStreams(getMapByKeyword(stats, 
FAILED)), win2failed) : win2failed);
-         putRawKV(ret, TYPE, getByKeyword(stats, TYPE));
++        //boolean isSpoutStat = SPOUT.equals(((Keyword) getByKey(stats, 
TYPE)).getName());
++        putKV(ret, WIN_TO_ACKED, isSpout ? mergeWithSum(win2acked, w2acked) : 
win2acked);
++        putKV(ret, WIN_TO_FAILED, isSpout ?
++                mergeWithSum(aggregateCountStreams(getMapByKey(stats, 
FAILED)), win2failed) : win2failed);
++        putKV(ret, TYPE, getByKey(stats, TYPE));
 +
 +        // (merge-with merge-agg-comp-stats-topo-page-bolt/spout (acc-stats 
comp-key) cid->statk->num)
 +        // (acc-stats comp-key) ==> bolt2stats/spout2stats
 +        if (isSpout) {
 +            Set<Object> keySet = new HashSet<>();
 +            keySet.addAll(spout2stats.keySet());
 +            keySet.addAll(cid2stat2num.keySet());
 +
 +            Map mm = new HashMap();
 +            for (Object k : keySet) {
 +                mm.put(k, mergeAggCompStatsTopoPageSpout((Map) 
spout2stats.get(k), (Map) cid2stat2num.get(k)));
 +            }
-             putRawKV(ret, SPOUT_TO_STATS, mm);
++            putKV(ret, SPOUT_TO_STATS, mm);
 +        } else {
 +            Set<Object> keySet = new HashSet<>();
 +            keySet.addAll(bolt2stats.keySet());
 +            keySet.addAll(cid2stat2num.keySet());
 +
 +            Map mm = new HashMap();
 +            for (Object k : keySet) {
 +                mm.put(k, mergeAggCompStatsTopoPageBolt((Map) 
bolt2stats.get(k), (Map) cid2stat2num.get(k)));
 +            }
-             putRawKV(ret, BOLT_TO_STATS, mm);
++            putKV(ret, BOLT_TO_STATS, mm);
 +        }
 +
 +        return ret;
 +    }
 +
 +    public static TopologyPageInfo aggTopoExecsStats(
 +            String topologyId, Map exec2nodePort, Map task2component,
 +            Map beats, StormTopology topology, String window, boolean 
includeSys, IStormClusterState clusterState) {
 +        List beatList = extractDataFromHb(exec2nodePort, task2component, 
beats, includeSys, topology);
 +        Map topoStats = aggregateTopoStats(window, includeSys, beatList);
 +        topoStats = postAggregateTopoStats(task2component, exec2nodePort, 
topoStats, topologyId, clusterState);
 +
 +        return thriftifyTopoPageData(topologyId, topoStats);
 +    }
 +
 +    public static Map aggregateTopoStats(String win, boolean includeSys, List 
data) {
 +        Map initVal = new HashMap();
-         putRawKV(initVal, WORKERS_SET, new HashSet());
-         putRawKV(initVal, BOLT_TO_STATS, new HashMap());
-         putRawKV(initVal, SPOUT_TO_STATS, new HashMap());
-         putRawKV(initVal, WIN_TO_EMITTED, new HashMap());
-         putRawKV(initVal, WIN_TO_TRANSFERRED, new HashMap());
-         putRawKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap());
-         putRawKV(initVal, WIN_TO_ACKED, new HashMap());
-         putRawKV(initVal, WIN_TO_FAILED, new HashMap());
++        putKV(initVal, WORKERS_SET, new HashSet());
++        putKV(initVal, BOLT_TO_STATS, new HashMap());
++        putKV(initVal, SPOUT_TO_STATS, new HashMap());
++        putKV(initVal, WIN_TO_EMITTED, new HashMap());
++        putKV(initVal, WIN_TO_TRANSFERRED, new HashMap());
++        putKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap());
++        putKV(initVal, WIN_TO_ACKED, new HashMap());
++        putKV(initVal, WIN_TO_FAILED, new HashMap());
 +
 +        for (Object o : data) {
 +            Map newData = (Map) o;
-             String compType = ((Keyword) getByKeyword(newData, 
TYPE)).getName();
++            String compType = ((Keyword) getByKey(newData, TYPE)).getName();
 +            initVal = aggTopoExecStats(win, includeSys, initVal, newData, 
compType);
 +        }
 +
 +        return initVal;
 +    }
 +
 +    public static Map postAggregateTopoStats(
 +            Map task2comp, Map exec2nodePort, Map accData, String topologyId, 
IStormClusterState clusterState) {
 +        Map ret = new HashMap();
-         putRawKV(ret, NUM_TASKS, task2comp.size());
-         putRawKV(ret, NUM_WORKERS, ((Set) getByKeyword(accData, 
WORKERS_SET)).size());
-         putRawKV(ret, NUM_EXECUTORS, exec2nodePort != null ? 
exec2nodePort.size() : 0);
++        putKV(ret, NUM_TASKS, task2comp.size());
++        putKV(ret, NUM_WORKERS, ((Set) getByKey(accData, 
WORKERS_SET)).size());
++        putKV(ret, NUM_EXECUTORS, exec2nodePort != null ? 
exec2nodePort.size() : 0);
 +
-         Map bolt2stats = getMapByKeyword(accData, BOLT_TO_STATS);
++        Map bolt2stats = getMapByKey(accData, BOLT_TO_STATS);
 +        Map aggBolt2stats = new HashMap();
 +        for (Object o : bolt2stats.entrySet()) {
 +            Map.Entry e = (Map.Entry) o;
 +            String id = (String) e.getKey();
 +            Map m = (Map) e.getValue();
 +            long executed = getByKeywordOr0(m, EXECUTED).longValue();
 +            if (executed > 0) {
 +                double execLatencyTotal = getByKeywordOr0(m, 
EXEC_LAT_TOTAL).doubleValue();
-                 putRawKV(m, EXEC_LATENCY, execLatencyTotal / executed);
++                putKV(m, EXEC_LATENCY, execLatencyTotal / executed);
 +
 +                double procLatencyTotal = getByKeywordOr0(m, 
PROC_LAT_TOTAL).doubleValue();
-                 putRawKV(m, PROC_LATENCY, procLatencyTotal / executed);
++                putKV(m, PROC_LATENCY, procLatencyTotal / executed);
 +            }
-             removeByKeyword(m, EXEC_LAT_TOTAL);
-             removeByKeyword(m, PROC_LAT_TOTAL);
-             putRawKV(m, "last-error", getLastError(clusterState, topologyId, 
id));
++            remove(m, EXEC_LAT_TOTAL);
++            remove(m, PROC_LAT_TOTAL);
++            putKV(m, "last-error", getLastError(clusterState, topologyId, 
id));
 +
 +            aggBolt2stats.put(id, m);
 +        }
-         putRawKV(ret, BOLT_TO_STATS, aggBolt2stats);
++        putKV(ret, BOLT_TO_STATS, aggBolt2stats);
 +
-         Map spout2stats = getMapByKeyword(accData, SPOUT_TO_STATS);
++        Map spout2stats = getMapByKey(accData, SPOUT_TO_STATS);
 +        Map spoutBolt2stats = new HashMap();
 +        for (Object o : spout2stats.entrySet()) {
 +            Map.Entry e = (Map.Entry) o;
 +            String id = (String) e.getKey();
 +            Map m = (Map) e.getValue();
 +            long acked = getByKeywordOr0(m, ACKED).longValue();
 +            if (acked > 0) {
 +                double compLatencyTotal = getByKeywordOr0(m, 
COMP_LAT_TOTAL).doubleValue();
-                 putRawKV(m, COMP_LATENCY, compLatencyTotal / acked);
++                putKV(m, COMP_LATENCY, compLatencyTotal / acked);
 +            }
-             removeByKeyword(m, COMP_LAT_TOTAL);
-             putRawKV(m, "last-error", getLastError(clusterState, topologyId, 
id));
++            remove(m, COMP_LAT_TOTAL);
++            putKV(m, "last-error", getLastError(clusterState, topologyId, 
id));
 +
 +            spoutBolt2stats.put(id, m);
 +        }
-         putRawKV(ret, SPOUT_TO_STATS, spoutBolt2stats);
++        putKV(ret, SPOUT_TO_STATS, spoutBolt2stats);
 +
-         putRawKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_EMITTED)));
-         putRawKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_TRANSFERRED)));
-         putRawKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_ACKED)));
-         putRawKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_FAILED)));
-         putRawKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow(
++        putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(accData, 
WIN_TO_EMITTED)));
++        putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(accData, 
WIN_TO_TRANSFERRED)));
++        putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(accData, 
WIN_TO_ACKED)));
++        putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(accData, 
WIN_TO_FAILED)));
++        putKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow(
 +                accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED));
 +        return ret;
 +    }
 +
 +    /**
 +     * aggregate bolt stats
 +     *
 +     * @param statsSeq   a seq of ExecutorStats
 +     * @param includeSys whether to include system streams
 +     * @return aggregated bolt stats
 +     */
 +    public static Map aggregateBoltStats(List statsSeq, boolean includeSys) {
 +        Map ret = new HashMap();
 +
 +        Map commonStats = 
preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys);
 +        List acked = new ArrayList();
 +        List failed = new ArrayList();
 +        List executed = new ArrayList();
 +        List processLatencies = new ArrayList();
 +        List executeLatencies = new ArrayList();
 +        for (Object o : statsSeq) {
 +            ExecutorStats stat = (ExecutorStats) o;
 +            acked.add(stat.get_specific().get_bolt().get_acked());
 +            failed.add(stat.get_specific().get_bolt().get_failed());
 +            executed.add(stat.get_specific().get_bolt().get_executed());
 +            
processLatencies.add(stat.get_specific().get_bolt().get_process_ms_avg());
 +            
executeLatencies.add(stat.get_specific().get_bolt().get_execute_ms_avg());
 +        }
 +        mergeMaps(ret, commonStats);
-         putRawKV(ret, ACKED, aggregateCounts(acked));
-         putRawKV(ret, FAILED, aggregateCounts(failed));
-         putRawKV(ret, EXECUTED, aggregateCounts(executed));
-         putRawKV(ret, PROC_LATENCIES, aggregateAverages(processLatencies, 
acked));
-         putRawKV(ret, EXEC_LATENCIES, aggregateAverages(executeLatencies, 
executed));
++        putKV(ret, ACKED, aggregateCounts(acked));
++        putKV(ret, FAILED, aggregateCounts(failed));
++        putKV(ret, EXECUTED, aggregateCounts(executed));
++        putKV(ret, PROC_LATENCIES, aggregateAverages(processLatencies, 
acked));
++        putKV(ret, EXEC_LATENCIES, aggregateAverages(executeLatencies, 
executed));
 +
 +        return ret;
 +    }
 +
 +    /**
 +     * aggregate spout stats
 +     *
 +     * @param statsSeq   a seq of ExecutorStats
 +     * @param includeSys whether to include system streams
 +     * @return aggregated spout stats
 +     */
 +    public static Map aggregateSpoutStats(List statsSeq, boolean includeSys) {
 +        Map ret = new HashMap();
 +
 +        Map commonStats = 
preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys);
 +        List acked = new ArrayList();
 +        List failed = new ArrayList();
 +        List completeLatencies = new ArrayList();
 +        for (Object o : statsSeq) {
 +            ExecutorStats stat = (ExecutorStats) o;
 +            acked.add(stat.get_specific().get_spout().get_acked());
 +            failed.add(stat.get_specific().get_spout().get_failed());
 +            
completeLatencies.add(stat.get_specific().get_spout().get_complete_ms_avg());
 +        }
 +        mergeMaps(ret, commonStats);
-         putRawKV(ret, ACKED, aggregateCounts(acked));
-         putRawKV(ret, FAILED, aggregateCounts(failed));
-         putRawKV(ret, COMP_LATENCIES, aggregateAverages(completeLatencies, 
acked));
++        putKV(ret, ACKED, aggregateCounts(acked));
++        putKV(ret, FAILED, aggregateCounts(failed));
++        putKV(ret, COMP_LATENCIES, aggregateAverages(completeLatencies, 
acked));
 +
 +        return ret;
 +    }
 +
 +    public static Map aggregateCommonStats(List statsSeq) {
 +        Map ret = new HashMap();
 +
 +        List emitted = new ArrayList();
 +        List transferred = new ArrayList();
 +        for (Object o : statsSeq) {
 +            ExecutorStats stat = (ExecutorStats) o;
 +            emitted.add(stat.get_emitted());
 +            transferred.add(stat.get_transferred());
 +        }
 +
-         putRawKV(ret, EMITTED, aggregateCounts(emitted));
-         putRawKV(ret, TRANSFERRED, aggregateCounts(transferred));
++        putKV(ret, EMITTED, aggregateCounts(emitted));
++        putKV(ret, TRANSFERRED, aggregateCounts(transferred));
 +        return ret;
 +    }
 +
 +    public static Map preProcessStreamSummary(Map streamSummary, boolean 
includeSys) {
-         Map emitted = getMapByKeyword(streamSummary, EMITTED);
-         Map transferred = getMapByKeyword(streamSummary, TRANSFERRED);
++        Map emitted = getMapByKey(streamSummary, EMITTED);
++        Map transferred = getMapByKey(streamSummary, TRANSFERRED);
 +
-         putRawKV(streamSummary, EMITTED, filterSysStreams(emitted, 
includeSys));
-         putRawKV(streamSummary, TRANSFERRED, filterSysStreams(transferred, 
includeSys));
++        putKV(streamSummary, EMITTED, filterSysStreams(emitted, includeSys));
++        putKV(streamSummary, TRANSFERRED, filterSysStreams(transferred, 
includeSys));
 +
 +        return streamSummary;
 +    }
 +
 +    public static Map aggregateCountStreams(Map stats) {
 +        Map ret = new HashMap();
 +        for (Object o : stats.entrySet()) {
 +            Map.Entry entry = (Map.Entry) o;
 +            Map value = (Map) entry.getValue();
 +            long sum = 0l;
 +            for (Object num : value.values()) {
 +                sum += ((Number) num).longValue();
 +            }
 +            ret.put(entry.getKey(), sum);
 +        }
 +        return ret;
 +    }
 +
 +    public static Map aggregateAverages(List avgSeq, List countSeq) {
 +        Map ret = new HashMap();
 +
 +        Map expands = expandAveragesSeq(avgSeq, countSeq);
 +        for (Object o : expands.entrySet()) {
 +            Map.Entry entry = (Map.Entry) o;
 +            Object k = entry.getKey();
 +
 +            Map tmp = new HashMap();
 +            Map inner = (Map) entry.getValue();
 +            for (Object kk : inner.keySet()) {
 +                List vv = (List) inner.get(kk);
 +                tmp.put(kk, valAvg(((Number) vv.get(0)).doubleValue(), 
((Number) vv.get(1)).longValue()));
 +            }
 +            ret.put(k, tmp);
 +        }
 +
 +        return ret;
 +    }
 +
 +    public static Map aggregateAvgStreams(Map avgs, Map counts) {
 +        Map ret = new HashMap();
 +
 +        Map expands = expandAverages(avgs, counts);
 +        for (Object o : expands.entrySet()) {
 +            Map.Entry e = (Map.Entry) o;
 +            Object win = e.getKey();
 +
 +            double avgTotal = 0.0;
 +            long cntTotal = 0l;
 +            Map inner = (Map) e.getValue();
 +            for (Object kk : inner.keySet()) {
 +                List vv = (List) inner.get(kk);
 +                avgTotal += ((Number) vv.get(0)).doubleValue();
 +                cntTotal += ((Number) vv.get(1)).longValue();
 +            }
 +            ret.put(win, valAvg(avgTotal, cntTotal));
 +        }
 +
 +        return ret;
 +    }
 +
 +    public static Map spoutStreamsStats(List summs, boolean includeSys) {
 +        List statsSeq = getFilledStats(summs);
 +        return aggregateSpoutStreams(aggregateSpoutStats(statsSeq, 
includeSys));
 +    }
 +
 +    public static Map boltStreamsStats(List summs, boolean includeSys) {
 +        List statsSeq = getFilledStats(summs);
 +        return aggregateBoltStreams(aggregateBoltStats(statsSeq, includeSys));
 +    }
 +
 +    public static Map aggregateSpoutStreams(Map stats) {
 +        Map ret = new HashMap();
-         putRawKV(ret, ACKED, aggregateCountStreams(getMapByKeyword(stats, 
ACKED)));
-         putRawKV(ret, FAILED, aggregateCountStreams(getMapByKeyword(stats, 
FAILED)));
-         putRawKV(ret, EMITTED, aggregateCountStreams(getMapByKeyword(stats, 
EMITTED)));
-         putRawKV(ret, TRANSFERRED, 
aggregateCountStreams(getMapByKeyword(stats, TRANSFERRED)));
-         putRawKV(ret, COMP_LATENCIES, aggregateAvgStreams(
-                 getMapByKeyword(stats, COMP_LATENCIES), 
getMapByKeyword(stats, ACKED)));
++        putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED)));
++        putKV(ret, FAILED, aggregateCountStreams(getMapByKey(stats, FAILED)));
++        putKV(ret, EMITTED, aggregateCountStreams(getMapByKey(stats, 
EMITTED)));
++        putKV(ret, TRANSFERRED, aggregateCountStreams(getMapByKey(stats, 
TRANSFERRED)));
++        putKV(ret, COMP_LATENCIES, aggregateAvgStreams(
++                getMapByKey(stats, COMP_LATENCIES), getMapByKey(stats, 
ACKED)));
 +        return ret;
 +    }
 +
 +    public static Map aggregateBoltStreams(Map stats) {
 +        Map ret = new HashMap();
-         putRawKV(ret, ACKED, aggregateCountStreams(getMapByKeyword(stats, 
ACKED)));
-         putRawKV(ret, FAILED, aggregateCountStreams(getMapByKeyword(stats, 
FAILED)));
-         putRawKV(ret, EMITTED, aggregateCountStreams(getMapByKeyword(stats, 
EMITTED)));
-         putRawKV(ret, TRANSFERRED, 
aggregateCountStreams(getMapByKeyword(stats, TRANSFERRED)));
-         putRawKV(ret, EXECUTED, aggregateCountStreams(getMapByKeyword(stats, 
EXECUTED)));
-         putRawKV(ret, PROC_LATENCIES, aggregateAvgStreams(
-                 getMapByKeyword(stats, PROC_LATENCIES), 
getMapByKeyword(stats, ACKED)));
-         putRawKV(ret, EXEC_LATENCIES, aggregateAvgStreams(
-                 getMapByKeyword(stats, EXEC_LATENCIES), 
getMapByKeyword(stats, EXECUTED)));
++        putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED)));
++        putKV(ret, FAILED, aggregateCountStreams(getMapByKey(stats, FAILED)));
++        putKV(ret, EMITTED, aggregateCountStreams(getMapByKey(stats, 
EMITTED)));
++        putKV(ret, TRANSFERRED, aggregateCountStreams(getMapByKey(stats, 
TRANSFERRED)));
++        putKV(ret, EXECUTED, aggregateCountStreams(getMapByKey(stats, 
EXECUTED)));
++        putKV(ret, PROC_LATENCIES, aggregateAvgStreams(
++                getMapByKey(stats, PROC_LATENCIES), getMapByKey(stats, 
ACKED)));
++        putKV(ret, EXEC_LATENCIES, aggregateAvgStreams(
++                getMapByKey(stats, EXEC_LATENCIES), getMapByKey(stats, 
EXECUTED)));
 +        return ret;
 +    }
 +
 +    /**
 +     * A helper function that aggregates windowed stats from one spout 
executor.
 +     */
 +    public static Map aggBoltExecWinStats(Map accStats, Map newStats, boolean 
includeSys) {
 +        Map ret = new HashMap();
 +
 +        Map m = new HashMap();
-         for (Object win : getMapByKeyword(newStats, EXECUTED).keySet()) {
++        for (Object win : getMapByKey(newStats, EXECUTED).keySet()) {
 +            m.put(win, aggBoltLatAndCount(
-                     (Map) (getMapByKeyword(newStats, 
EXEC_LATENCIES)).get(win),
-                     (Map) (getMapByKeyword(newStats, 
PROC_LATENCIES)).get(win),
-                     (Map) (getMapByKeyword(newStats, EXECUTED)).get(win)));
++                    (Map) (getMapByKey(newStats, EXEC_LATENCIES)).get(win),
++                    (Map) (getMapByKey(newStats, PROC_LATENCIES)).get(win),
++                    (Map) (getMapByKey(newStats, EXECUTED)).get(win)));
 +        }
 +        m = swapMapOrder(m);
 +
-         Map win2execLatWgtAvg = getMapByKeyword(m, EXEC_LAT_TOTAL);
-         Map win2procLatWgtAvg = getMapByKeyword(m, PROC_LAT_TOTAL);
-         Map win2executed = getMapByKeyword(m, EXECUTED);
++        Map win2execLatWgtAvg = getMapByKey(m, EXEC_LAT_TOTAL);
++        Map win2procLatWgtAvg = getMapByKey(m, PROC_LAT_TOTAL);
++        Map win2executed = getMapByKey(m, EXECUTED);
 +
-         Map emitted = getMapByKeyword(newStats, EMITTED);
++        Map emitted = getMapByKey(newStats, EMITTED);
 +        emitted = 
mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, includeSys)),
-                 getMapByKeyword(accStats, WIN_TO_EMITTED));
-         putRawKV(ret, WIN_TO_EMITTED, emitted);
++                getMapByKey(accStats, WIN_TO_EMITTED));
++        putKV(ret, WIN_TO_EMITTED, emitted);
 +
-         Map transferred = getMapByKeyword(newStats, TRANSFERRED);
++        Map transferred = getMapByKey(newStats, TRANSFERRED);
 +        transferred = 
mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)),
-                 getMapByKeyword(accStats, WIN_TO_TRANSFERRED));
-         putRawKV(ret, WIN_TO_TRANSFERRED, transferred);
- 
-         putRawKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, mergeWithSum(
-                 getMapByKeyword(accStats, WIN_TO_EXEC_LAT_WGT_AVG), 
win2execLatWgtAvg));
-         putRawKV(ret, WIN_TO_PROC_LAT_WGT_AVG, mergeWithSum(
-                 getMapByKeyword(accStats, WIN_TO_PROC_LAT_WGT_AVG), 
win2procLatWgtAvg));
-         putRawKV(ret, WIN_TO_EXECUTED, mergeWithSum(
-                 getMapByKeyword(accStats, WIN_TO_EXECUTED), win2executed));
-         putRawKV(ret, WIN_TO_ACKED, mergeWithSum(
-                 aggregateCountStreams(getMapByKeyword(newStats, ACKED)), 
getMapByKeyword(accStats, WIN_TO_ACKED)));
-         putRawKV(ret, WIN_TO_FAILED, mergeWithSum(
-                 aggregateCountStreams(getMapByKeyword(newStats, FAILED)), 
getMapByKeyword(accStats, WIN_TO_FAILED)));
++                getMapByKey(accStats, WIN_TO_TRANSFERRED));
++        putKV(ret, WIN_TO_TRANSFERRED, transferred);
++
++        putKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, mergeWithSum(
++                getMapByKey(accStats, WIN_TO_EXEC_LAT_WGT_AVG), 
win2execLatWgtAvg));
++        putKV(ret, WIN_TO_PROC_LAT_WGT_AVG, mergeWithSum(
++                getMapByKey(accStats, WIN_TO_PROC_LAT_WGT_AVG), 
win2procLatWgtAvg));
++        putKV(ret, WIN_TO_EXECUTED, mergeWithSum(
++                getMapByKey(accStats, WIN_TO_EXECUTED), win2executed));
++        putKV(ret, WIN_TO_ACKED, mergeWithSum(
++                aggregateCountStreams(getMapByKey(newStats, ACKED)), 
getMapByKey(accStats, WIN_TO_ACKED)));
++        putKV(ret, WIN_TO_FAILED, mergeWithSum(
++                aggregateCountStreams(getMapByKey(newStats, FAILED)), 
getMapByKey(accStats, WIN_TO_FAILED)));
 +
 +        return ret;
 +    }
 +
 +    /**
 +     * A helper function that aggregates windowed stats from one spout 
executor.
 +     */
 +    public static Map aggSpoutExecWinStats(Map accStats, Map newStats, 
boolean includeSys) {
 +        Map ret = new HashMap();
 +
 +        Map m = new HashMap();
-         for (Object win : getMapByKeyword(newStats, ACKED).keySet()) {
++        for (Object win : getMapByKey(newStats, ACKED).keySet()) {
 +            m.put(win, aggSpoutLatAndCount(
-                     (Map) (getMapByKeyword(newStats, 
COMP_LATENCIES)).get(win),
-                     (Map) (getMapByKeyword(newStats, ACKED)).get(win)));
++                    (Map) (getMapByKey(newStats, COMP_LATENCIES)).get(win),
++                    (Map) (getMapByKey(newStats, ACKED)).get(win)));
 +        }
 +        m = swapMapOrder(m);
 +
-         Map win2compLatWgtAvg = getMapByKeyword(m, COMP_LAT_TOTAL);
-         Map win2acked = getMapByKeyword(m, ACKED);
++        Map win2compLatWgtAvg = getMapByKey(m, COMP_LAT_TOTAL);
++        Map win2acked = getMapByKey(m, ACKED);
 +
-         Map emitted = getMapByKeyword(newStats, EMITTED);
++        Map emitted = getMapByKey(newStats, EMITTED);
 +        emitted = 
mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, includeSys)),
-                 getMapByKeyword(accStats, WIN_TO_EMITTED));
-         putRawKV(ret, WIN_TO_EMITTED, emitted);
++                getMapByKey(accStats, WIN_TO_EMITTED));
++        putKV(ret, WIN_TO_EMITTED, emitted);
 +
-         Map transferred = getMapByKeyword(newStats, TRANSFERRED);
++        Map transferred = getMapByKey(newStats, TRANSFERRED);
 +        transferred = 
mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)),
-                 getMapByKeyword(accStats, WIN_TO_TRANSFERRED));
-         putRawKV(ret, WIN_TO_TRANSFERRED, transferred);
++                getMapByKey(accStats, WIN_TO_TRANSFERRED));
++        putKV(ret, WIN_TO_TRANSFERRED, transferred);
 +
-         putRawKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum(
-                 getMapByKeyword(accStats, WIN_TO_COMP_LAT_WGT_AVG), 
win2compLatWgtAvg));
-         putRawKV(ret, WIN_TO_ACKED, mergeWithSum(
-                 getMapByKeyword(accStats, WIN_TO_ACKED), win2acked));
-         putRawKV(ret, WIN_TO_FAILED, mergeWithSum(
-                 aggregateCountStreams(getMapByKeyword(newStats, FAILED)), 
getMapByKeyword(accStats, WIN_TO_FAILED)));
++        putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum(
++                getMapByKey(accStats, WIN_TO_COMP_LAT_WGT_AVG), 
win2compLatWgtAvg));
++        putKV(ret, WIN_TO_ACKED, mergeWithSum(
++                getMapByKey(accStats, WIN_TO_ACKED), win2acked));
++        putKV(ret, WIN_TO_FAILED, mergeWithSum(
++                aggregateCountStreams(getMapByKey(newStats, FAILED)), 
getMapByKey(accStats, WIN_TO_FAILED)));
 +
 +        return ret;
 +    }
 +
 +
 +    /**
 +     * aggregate counts
 +     *
 +     * @param countsSeq a seq of {win -> GlobalStreamId -> value}
 +     */
 +    public static Map aggregateCounts(List countsSeq) {
 +        Map ret = new HashMap();
 +        for (Object counts : countsSeq) {
 +            for (Object o : ((Map) counts).entrySet()) {
 +                Map.Entry e = (Map.Entry) o;
 +                Object win = e.getKey();
 +                Map stream2count = (Map) e.getValue();
 +
 +                if (!ret.containsKey(win)) {
 +                    ret.put(win, stream2count);
 +                } else {
 +                    Map existing = (Map) ret.get(win);
 +                    for (Object oo : stream2count.entrySet()) {
 +                        Map.Entry ee = (Map.Entry) oo;
 +                        Object stream = ee.getKey();
 +                        if (!existing.containsKey(stream)) {
 +                            existing.put(stream, ee.getValue());
 +                        } else {
 +                            existing.put(stream, (Long) ee.getValue() + 
(Long) existing.get(stream));
 +                        }
 +                    }
 +                }
 +            }
 +        }
 +        return ret;
 +    }
 +
 +    public static Map aggregateCompStats(String window, boolean includeSys, 
List data, String compType) {
 +        boolean isSpout = SPOUT.equals(compType);
 +
 +        Map initVal = new HashMap();
-         putRawKV(initVal, WIN_TO_ACKED, new HashMap());
-         putRawKV(initVal, WIN_TO_FAILED, new HashMap());
-         putRawKV(initVal, WIN_TO_EMITTED, new HashMap());
-         putRawKV(initVal, WIN_TO_TRANSFERRED, new HashMap());
++        putKV(initVal, WIN_TO_ACKED, new HashMap());
++        putKV(initVal, WIN_TO_FAILED, new HashMap());
++        putKV(initVal, WIN_TO_EMITTED, new HashMap());
++        putKV(initVal, WIN_TO_TRANSFERRED, new HashMap());
 +
 +        Map stats = new HashMap();
-         putRawKV(stats, EXECUTOR_STATS, new ArrayList());
-         putRawKV(stats, SID_TO_OUT_STATS, new HashMap());
++        putKV(stats, EXECUTOR_STATS, new ArrayList());
++        putKV(stats, SID_TO_OUT_STATS, new HashMap());
 +        if (isSpout) {
-             putRawKV(initVal, TYPE, KW_SPOUT);
-             putRawKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap());
++            putKV(initVal, TYPE, KW_SPOUT);
++            putKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap());
 +        } else {
-             putRawKV(initVal, TYPE, KW_BOLT);
-             putRawKV(initVal, WIN_TO_EXECUTED, new HashMap());
-             putRawKV(stats, CID_SID_TO_IN_STATS, new HashMap());
-             putRawKV(initVal, WIN_TO_EXEC_LAT_WGT_AVG, new HashMap());
-             putRawKV(initVal, WIN_TO_PROC_LAT_WGT_AVG, new HashMap());
++            putKV(initVal, TYPE, KW_BOLT);
++            putKV(initVal, WIN_TO_EXECUTED, new HashMap());
++            putKV(stats, CID_SID_TO_IN_STATS, new HashMap());
++            putKV(initVal, WIN_TO_EXEC_LAT_WGT_AVG, new HashMap());
++            putKV(initVal, WIN_TO_PROC_LAT_WGT_AVG, new HashMap());
 +        }
-         putRawKV(initVal, STATS, stats);
++        putKV(initVal, STATS, stats);
 +
 +        for (Object o : data) {
 +            initVal = aggCompExecStats(window, includeSys, initVal, (Map) o, 
compType);
 +        }
 +
 +        return initVal;
 +    }
 +
 +    /**
 +     * Combines the aggregate stats of one executor with the given map, 
selecting
 +     * the appropriate window and including system components as specified.
 +     */
 +    public static Map aggCompExecStats(String window, boolean includeSys, Map 
accStats, Map newData, String compType) {
 +        Map ret = new HashMap();
 +        if (SPOUT.equals(compType)) {
-             ret.putAll(aggSpoutExecWinStats(accStats, 
getMapByKeyword(newData, STATS), includeSys));
-             putRawKV(ret, STATS, mergeAggCompStatsCompPageSpout(
-                     getMapByKeyword(accStats, STATS),
++            ret.putAll(aggSpoutExecWinStats(accStats, getMapByKey(newData, 
STATS), includeSys));
++            putKV(ret, STATS, mergeAggCompStatsCompPageSpout(
++                    getMapByKey(accStats, STATS),
 +                    aggPreMergeCompPageSpout(newData, window, includeSys)));
 +        } else {
-             ret.putAll(aggBoltExecWinStats(accStats, getMapByKeyword(newData, 
STATS), includeSys));
-             putRawKV(ret, STATS, mergeAggCompStatsCompPageBolt(
-                     getMapByKeyword(accStats, STATS),
++            ret.putAll(aggBoltExecWinStats(accStats, getMapByKey(newData, 
STATS), includeSys));
++            putKV(ret, STATS, mergeAggCompStatsCompPageBolt(
++                    getMapByKey(accStats, STATS),
 +                    aggPreMergeCompPageBolt(newData, window, includeSys)));
 +        }
-         putRawKV(ret, TYPE, keyword(compType));
++        putKV(ret, TYPE, keyword(compType));
 +
 +        return ret;
 +    }
 +
 +    public static Map postAggregateCompStats(Map task2component, Map 
exec2hostPort, Map accData) {
 +        Map ret = new HashMap();
 +
-         String compType = ((Keyword) getByKeyword(accData, TYPE)).getName();
-         Map stats = getMapByKeyword(accData, STATS);
++        String compType = ((Keyword) getByKey(accData, TYPE)).getName();
++        Map stats = getMapByKey(accData, STATS);
 +        Integer numTasks = getByKeywordOr0(stats, NUM_TASKS).intValue();
 +        Integer numExecutors = getByKeywordOr0(stats, 
NUM_EXECUTORS).intValue();
-         Map outStats = getMapByKeyword(stats, SID_TO_OUT_STATS);
++        Map outStats = getMapByKey(stats, SID_TO_OUT_STATS);
 +
-         putRawKV(ret, TYPE, keyword(compType));
-         putRawKV(ret, NUM_TASKS, numTasks);
-         putRawKV(ret, NUM_EXECUTORS, numExecutors);
-         putRawKV(ret, EXECUTOR_STATS, getByKeyword(stats, EXECUTOR_STATS));
-         putRawKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_EMITTED)));
-         putRawKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_TRANSFERRED)));
-         putRawKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_ACKED)));
-         putRawKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_FAILED)));
++        putKV(ret, TYPE, keyword(compType));
++        putKV(ret, NUM_TASKS, numTasks);
++        putKV(ret, NUM_EXECUTORS, numExecutors);
++        putKV(ret, EXECUTOR_STATS, getByKey(stats, EXECUTOR_STATS));
++        putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(accData, 
WIN_TO_EMITTED)));
++        putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(accData, 
WIN_TO_TRANSFERRED)));
++        putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(accData, 
WIN_TO_ACKED)));
++        putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(accData, 
WIN_TO_FAILED)));
 +
 +        if (BOLT.equals(compType)) {
-             Map inStats = getMapByKeyword(stats, CID_SID_TO_IN_STATS);
++            Map inStats = getMapByKey(stats, CID_SID_TO_IN_STATS);
 +
 +            Map inStats2 = new HashMap();
 +            for (Object o : inStats.entrySet()) {
 +                Map.Entry e = (Map.Entry) o;
 +                Object k = e.getKey();
 +                Map v = (Map) e.getValue();
 +                long executed = getByKeywordOr0(v, EXECUTED).longValue();
 +                if (executed > 0) {
 +                    double executeLatencyTotal = getByKeywordOr0(v, 
EXEC_LAT_TOTAL).doubleValue();
 +                    double processLatencyTotal = getByKeywordOr0(v, 
PROC_LAT_TOTAL).doubleValue();
-                     putRawKV(v, EXEC_LATENCY, executeLatencyTotal / executed);
-                     putRawKV(v, PROC_LATENCY, processLatencyTotal / executed);
++                    putKV(v, EXEC_LATENCY, executeLatencyTotal / executed);
++                    putKV(v, PROC_LATENCY, processLatencyTotal / executed);
 +                } else {
-                     putRawKV(v, EXEC_LATENCY, 0.0);
-                     putRawKV(v, PROC_LATENCY, 0.0);
++                    putKV(v, EXEC_LATENCY, 0.0);
++                    putKV(v, PROC_LATENCY, 0.0);
 +                }
-                 removeByKeyword(v, EXEC_LAT_TOTAL);
-                 removeByKeyword(v, PROC_LAT_TOTAL);
++                remove(v, EXEC_LAT_TOTAL);
++                remove(v, PROC_LAT_TOTAL);
 +                inStats2.put(k, v);
 +            }
-             putRawKV(ret, CID_SID_TO_IN_STATS, inStats2);
++            putKV(ret, CID_SID_TO_IN_STATS, inStats2);
 +
-             putRawKV(ret, SID_TO_OUT_STATS, outStats);
-             putRawKV(ret, WIN_TO_EXECUTED, mapKeyStr(getMapByKeyword(accData, 
WIN_TO_EXECUTED)));
-             putRawKV(ret, WIN_TO_EXEC_LAT, computeWeightedAveragesPerWindow(
++            putKV(ret, SID_TO_OUT_STATS, outStats);
++            putKV(ret, WIN_TO_EXECUTED, mapKeyStr(getMapByKey(accData, 
WIN_TO_EXECUTED)));
++            putKV(ret, WIN_TO_EXEC_LAT, computeWeightedAveragesPerWindow(
 +                    accData, WIN_TO_EXEC_LAT_WGT_AVG, WIN_TO_EXECUTED));
-             putRawKV(ret, WIN_TO_PROC_LAT, computeWeightedAveragesPerWindow(
++            putKV(ret, WIN_TO_PROC_LAT, computeWeightedAveragesPerWindow(
 +                    accData, WIN_TO_PROC_LAT_WGT_AVG, WIN_TO_EXECUTED));
 +        } else {
 +            Map outStats2 = new HashMap();
 +            for (Object o : outStats.entrySet()) {
 +                Map.Entry e = (Map.Entry) o;
 +                Object k = e.getKey();
 +                Map v = (Map) e.getValue();
 +                long acked = getByKeywordOr0(v, ACKED).longValue();
 +                if (acked > 0) {
 +                    double compLatencyTotal = getByKeywordOr0(v, 
COMP_LAT_TOTAL).doubleValue();
-                     putRawKV(v, COMP_LATENCY, compLatencyTotal / acked);
++                    putKV(v, COMP_LATENCY, compLatencyTotal / acked);
 +                } else {
-                     putRawKV(v, COMP_LATENCY, 0.0);
++                    putKV(v, COMP_LATENCY, 0.0);
 +                }
-                 removeByKeyword(v, COMP_LAT_TOTAL);
++                remove(v, COMP_LAT_TOTAL);
 +                outStats2.put(k, v);
 +            }
-             putRawKV(ret, SID_TO_OUT_STATS, outStats2);
-             putRawKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow(
++            putKV(ret, SID_TO_OUT_STATS, outStats2);
++            putKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow(
 +                    accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED));
 +        }
 +
 +        return ret;
 +    }
 +
 +    public static ComponentPageInfo aggCompExecsStats(
 +            Map exec2hostPort, Map task2component, Map beats, String window, 
boolean includeSys,
 +            String topologyId, StormTopology topology, String componentId) {
 +
 +        List beatList = extractDataFromHb(exec2hostPort, task2component, 
beats, includeSys, topology, componentId);
 +        Map compStats = aggregateCompStats(window, includeSys, beatList, 
componentType(topology, componentId).getName());
 +        compStats = postAggregateCompStats(task2component, exec2hostPort, 
compStats);
 +        return thriftifyCompPageData(topologyId, topology, componentId, 
compStats);
 +    }
 +
 +
 +    // 
=====================================================================================
 +    // clojurify stats methods
 +    // 
=====================================================================================
 +
 +    public static Map clojurifyStats(Map stats) {
 +        Map ret = new HashMap();
 +        for (Object o : stats.entrySet()) {
 +            Map.Entry entry = (Map.Entry) o;
 +            ExecutorInfo executorInfo = (ExecutorInfo) entry.getKey();
 +            ExecutorStats executorStats = (ExecutorStats) entry.getValue();
 +
 +            ret.put(Lists.newArrayList(executorInfo.get_task_start(), 
executorInfo.get_task_end()),
 +                    clojurifyExecutorStats(executorStats));
 +        }
 +        return ret;
 +    }
 +
 +    public static Map clojurifyExecutorStats(ExecutorStats stats) {
 +        Map ret = new HashMap();
 +
-         putRawKV(ret, EMITTED, stats.get_emitted());
-         putRawKV(ret, TRANSFERRED, stats.get_transferred());
-         putRawKV(ret, "rate", stats.get_rate());
++        putKV(ret, EMITTED, stats.get_emitted());
++        putKV(ret, TRANSFERRED, stats.get_transferred());
++        putKV(ret, "rate", stats.get_rate());
 +
 +        if (stats.get_specific().is_set_bolt()) {
 +            mergeMaps(ret, 
clojurifySpecificStats(stats.get_specific().get_bolt()));
-             putRawKV(ret, TYPE, KW_BOLT);
++            putKV(ret, TYPE, KW_BOLT);
 +        } else {
 +            mergeMaps(ret, 
clojurifySpecificStats(stats.get_specific().get_spout()));
-             putRawKV(ret, TYPE, KW_SPOUT);
++            putKV(ret, TYPE, KW_SPOUT);
 +        }
 +
 +        return ret;
 +    }
 +
 +    public static Map clojurifySpecificStats(SpoutStats stats) {
 +        Map ret = new HashMap();
-         putRawKV(ret, ACKED, stats.get_acked());
-         putRawKV(ret, FAILED, stats.get_failed());
-         putRawKV(ret, COMP_LATENCIES, stats.get_complete_ms_avg());
++        putKV(ret, ACKED, stats.get_acked());
++        putKV(ret, FAILED, stats.get_failed());
++        putKV(ret, COMP_LATENCIES, stats.get_complete_ms_avg());
 +
 +        return ret;
 +    }
 +
 +    public static Map clojurifySpecificStats(BoltStats stats) {
 +        Map ret = new HashMap();
 +
 +        Map acked = windowSetConverter(stats.get_acked(), FROM_GSID, 
IDENTITY);
 +        Map failed = windowSetConverter(stats.get_failed(), FROM_GSID, 
IDENTITY);
 +        Map processAvg = windowSetConverter(stats.get_process_ms_avg(), 
FROM_GSID, IDENTITY);
 +        Map executed = windowSetConverter(stats.get_executed(), FROM_GSID, 
IDENTITY);
 +        Map executeAvg = windowSetConverter(stats.get_execute_ms_avg(), 
FROM_GSID, IDENTITY);
 +
-         putRawKV(ret, ACKED, acked);
-         putRawKV(ret, FAILED, failed);
-         putRawKV(ret, PROC_LATENCIES, processAvg);
-         putRawKV(ret, EXECUTED, executed);
-         putRawKV(ret, EXEC_LATENCIES, executeAvg);
++        putKV(ret, ACKED, acked);
++        putKV(ret, FAILED, failed);
++        putKV(ret, PROC_LATENCIES, processAvg);
++        putKV(ret, EXECUTED, executed);
++        putKV(ret, EXEC_LATENCIES, executeAvg);
 +
 +        return ret;
 +    }
 +
 +    public static List extractNodeInfosFromHbForComp(
 +            Map exec2hostPort, Map task2component, boolean includeSys, String 
compId) {
 +        List ret = new ArrayList();
 +
 +        Set<List> hostPorts = new HashSet<>();
 +        for (Object o : exec2hostPort.entrySet()) {
 +            Map.Entry entry = (Map.Entry) o;
 +            List key = (List) entry.getKey();
 +            List value = (List) entry.getValue();
 +
 +            Integer start = ((Number) key.get(0)).intValue();
 +            String host = (String) value.get(0);
 +            Integer port = (Integer) value.get(1);
 +            String comp = (String) task2component.get(start);
 +            if ((compId == null || compId.equals(comp)) && (includeSys || 
!Utils.isSystemId(comp))) {
 +                hostPorts.add(Lists.newArrayList(host, port));
 +            }
 +        }
 +
 +        for (List hostPort : hostPorts) {
 +            Map m = new HashMap();
-             putRawKV(m, HOST, hostPort.get(0));
-             putRawKV(m, PORT, hostPort.get(1));
++            putKV(m, HOST, hostPort.get(0));
++            putKV(m, PORT, hostPort.get(1));
 +            ret.add(m);
 +        }
 +
 +        return ret;
 +    }
 +
-     public static List extractDataFromHb(Map executor2hostPort, Map 
task2component, Map beats,
-                                          boolean includeSys, StormTopology 
topology) {
++    /**
++     * extracts a list of executor data from heart beats
++     */
++    public static List<Map<String, Object>> extractDataFromHb(Map 
executor2hostPort, Map task2component, Map beats,
++                                                              boolean 
includeSys, StormTopology topology) {
 +        return extractDataFromHb(executor2hostPort, task2component, beats, 
includeSys, topology, null);
 +    }
 +
-     public static List extractDataFromHb(Map executor2hostPort, Map 
task2component, Map beats,
-                                          boolean includeSys, StormTopology 
topology, String compId) {
-         List ret = new ArrayList();
++    public static List<Map<String, Object>> extractDataFromHb(Map 
executor2hostPort, Map task2component, Map beats,
++                                                              boolean 
includeSys, StormTopology topology, String compId) {
++        List<Map<String, Object>> ret = new ArrayList<>();
 +        if (executor2hostPort == null) {
 +            return ret;
 +        }
 +        for (Object o : executor2hostPort.entrySet()) {
 +            Map.Entry entry = (Map.Entry) o;
 +            List key = (List) entry.getKey();
 +            List value = (List) entry.getValue();
 +
 +            Integer start = ((Number) key.get(0)).intValue();
 +            Integer end = ((Number) key.get(1)).intValue();
 +
 +            String host = (String) value.get(0);
 +            Integer port = ((Number) value.get(1)).intValue();
 +
 +            Map beat = (Map) beats.get(key);
 +            if (beat == null) {
 +                continue;
 +            }
 +            String id = (String) task2component.get(start);
 +
-             Map m = new HashMap();
++            Map<String, Object> m = new HashMap<>();
 +            if ((compId == null || compId.equals(id)) && (includeSys || 
!Utils.isSystemId(id))) {
-                 putRawKV(m, "exec-id", entry.getKey());
-                 putRawKV(m, "comp-id", id);
-                 putRawKV(m, NUM_TASKS, end - start + 1);
-                 putRawKV(m, HOST, host);
-                 putRawKV(m, PORT, port);
-                 putRawKV(m, UPTIME, beat.get(keyword(UPTIME)));
-                 putRawKV(m, STATS, beat.get(keyword(STATS)));
++                putKV(m, "exec-id", entry.getKey());
++                putKV(m, "comp-id", id);
++                putKV(m, NUM_TASKS, end - start + 1);
++                putKV(m, HOST, host);
++                putKV(m, PORT, port);
++                putKV(m, UPTIME, beat.get(keyword(UPTIME)));
++                putKV(m, STATS, beat.get(keyword(STATS)));
 +
 +                Keyword type = componentType(topology, compId);
 +                if (type != null) {
-                     putRawKV(m, TYPE, type);
++                    putKV(m, TYPE, type);
 +                } else {
-                     putRawKV(m, TYPE, getByKeyword(getMapByKeyword(beat, 
STATS), TYPE));
++                    putKV(m, TYPE, getByKey(getMapByKey(beat, STATS), TYPE));
 +                }
 +                ret.add(m);
 +            }
 +        }
 +        return ret;
 +    }
 +
 +    private static Map computeWeightedAveragesPerWindow(Map accData, String 
wgtAvgKey, String divisorKey) {
 +        Map ret = new HashMap();
-         for (Object o : getMapByKeyword(accData, wgtAvgKey).entrySet()) {
++        for (Object o : getMapByKey(accData, wgtAvgKey).entrySet()) {
 +            Map.Entry e = (Map.Entry) o;
 +            Object window = e.getKey();
 +            double wgtAvg = ((Number) e.getValue()).doubleValue();
-             long divisor = ((Number) getMapByKeyword(accData, 
divisorKey).get(window)).longValue();
++            long divisor = ((Number) getMapByKey(accData, 
divisorKey).get(window)).longValue();
 +            if (divisor > 0) {
 +                ret.put(window.toString(), wgtAvg / divisor);
 +            }
 +        }
 +        return ret;
 +    }
 +
 +
 +    /**
 +     * computes max bolt capacity
 +     *
 +     * @param executorSumms a list of ExecutorSummary
 +     * @return max bolt capacity
 +     */
 +    public static double computeBoltCapacity(List executorSumms) {
 +        double max = 0.0;
 +        for (Object o : executorSumms) {
 +            ExecutorSummary summary = (ExecutorSummary) o;
 +            double capacity = computeExecutorCapacity(summary);
 +            if (capacity > max) {
 +                max = capacity;
 +            }
 +        }
 +        return max;
 +    }
 +
 +    public static double computeExecutorCapacity(ExecutorSummary summ) {
 +        ExecutorStats stats = summ.get_stats();
 +        if (stats == null) {
 +            return 0.0;
 +        } else {
 +            Map m = aggregateBoltStats(Lists.newArrayList(stats), true);
 +            m = swapMapOrder(aggregateBoltStreams(m));
-             Map data = getMapByKeyword(m, TEN_MIN_IN_SECONDS_STR);
++            Map data = getMapByKey(m, TEN_MIN_IN_SECONDS_STR);
 +
 +            int uptime = summ.get_uptime_secs();
 +            int win = Math.min(uptime, TEN_MIN_IN_SECONDS);
 +            long executed = getByKeywordOr0(data, EXECUTED).longValue();
 +            double latency = getByKeywordOr0(data, 
EXEC_LATENCIES).doubleValue();
 +            if (win > 0) {
 +                return executed * latency / (1000 * win);
 +            }
 +            return 0.0;
 +        }
 +    }
 +
 +    /**
 +     * filter ExecutorSummary whose stats is null
 +     *
 +     * @param summs a list of ExecutorSummary
 +     * @return filtered summs
 +     */
 +    public static List getFilledStats(List summs) {
 +        for (Iterator itr = summs.iterator(); itr.hasNext(); ) {
 +            ExecutorSummary summ = (ExecutorSummary) itr.next();
 +            if (summ.get_stats() == null) {
 +                itr.remove();
 +            }
 +        }
 +        return summs;
 +    }
 +
 +    private static Map mapKeyStr(Map m) {
 +        Map ret = new HashMap();
 +        for (Object k : m.keySet()) {
 +            ret.put(k.toString(), m.get(k));
 +        }
 +        return ret;
 +    }
 +
 +    private static long sumStreamsLong(Map m, String key) {
 +        long sum = 0;
 +        if (m == null) {
 +            return sum;
 +        }
 +        for (Object v : m.values()) {
 +            Map sub = (Map) v;
 +            for (Object o : sub.entrySet()) {
 +                Map.Entry e = (Map.Entry) o;
 +                if (((Keyword) e.getKey()).getName().equals(key)) {
 +                    sum += ((Number) e.getValue()).longValue();
 +                }
 +            }
 +        }
 +        return sum;
 +    }
 +
 +    private static double sumStreamsDouble(Map m, String key) {
 +        double sum = 0;
 +        if (m == null) {
 +            return sum;
 +        }
 +        for (Object v : m.values()) {
 +            Map sub = (Map) v;
 +            for (Object o : sub.entrySet()) {
 +                Map.Entry e = (Map.Entry) o;
 +                if (((Keyword) e.getKey()).getName().equals(key)) {
 +                    sum += ((Number) e.getValue()).doubleValue();
 +                }
 +            }
 +        }
 +        return sum;
 +    }
 +
 +    /**
 +     * same as clojure's (merge-with merge m1 m2)
 +     */
 +    private static Map mergeMaps(Map m1, Map m2) {
 +        if (m2 == null) {
 +            return m1;
 +        }
 +        for (Object o : m2.entrySet()) {
 +            Map.Entry entry = (Map.Entry) o;
 +            Object k = entry.getKey();
 +
 +            Map existing = (Map) m1.get(k);
 +            if (existing == null) {
 +                m1.put(k, entry.getValue());
 +            } else {
 +                existing.putAll((Map) m2.get(k));
 +            }
 +        }
 +        return m1;
 +    }
 +
 +    /**
 +     * filter system streams from stats
 +     *
 +     * @param stats      { win -> stream id -> value }
 +     * @param includeSys whether to filter system streams
 +     * @return filtered stats
 +     */
 +    private static Map filterSysStreams(Map stats, boolean includeSys) {
 +        if (!includeSys) {
 +            for (Iterator itr = stats.keySet().iterator(); itr.hasNext(); ) {
 +                Object winOrStream = itr.next();
 +                if (isWindow(winOrStream)) {
 +                    Map stream2stat = (Map) stats.get(winOrStream);
 +                    for (Iterator subItr = stream2stat.keySet().iterator(); 
subItr.hasNext(); ) {
 +                        Object key = subItr.next();
 +                        if (key instanceof String && 
Utils.isSystemId((String) key)) {
 +                            subItr.remove();
 +                        }
 +                    }
 +                } else {
 +                    if (winOrStream instanceof String && 
Utils.isSystemId((String) winOrStream)) {
 +                        itr.remove();
 +                    }
 +                }
 +            }
 +        }
 +        return stats;
 +    }
 +
 +    private static boolean isWindow(Object key) {
 +        return key.equals("600") || key.equals("10800") || 
key.equals("86400") || key.equals(":all-time");
 +    }
 +
 +    /**
 +     * equals to clojure's: (merge-with (partial merge-with sum-or-0) acc-out 
spout-out)
 +     */
 +    private static Map fullMergeWithSum(Map m1, Map m2) {
 +        Set<Object> allKeys = new HashSet<>();
 +        if (m1 != null) {
 +            allKeys.addAll(m1.keySet());
 +        }
 +        if (m2 != null) {
 +            allKeys.addAll(m2.keySet());
 +        }
 +
 +        Map ret = new HashMap();
 +        for (Object k : allKeys) {
 +            Map mm1 = null, mm2 = null;
 +            if (m1 != null) {
 +                mm1 = (Map) m1.get(k);
 +            }
 +            if (m2 != null) {
 +                mm2 = (Map) m2.get(k);
 +            }
 +            ret.put(k, mergeWithSum(mm1, mm2));
 +        }
 +
 +        return ret;
 +    }
 +
 +    private static Map mergeWithSum(Map m1, Map m2) {
 +        Map ret = new HashMap();
 +
 +        Set<Object> allKeys = new HashSet<>();
 +        if (m1 != null) {
 +            allKeys.addAll(m1.keySet());
 +        }
 +        if (m2 != null) {
 +            allKeys.addAll(m2.keySet());
 +        }
 +
 +        for (Object k : allKeys) {
 +            Number n1 = getOr0(m1, k);
 +            Number n2 = getOr0(m2, k);
 +            ret.put(k, add(n1, n2));
 +        }
 +        return ret;
 +    }
 +
 +    /**
 +     * this method merges 2 two-level-deep maps, which is different from 
mergeWithSum, and we expect the two maps
 +     * have the same keys
 +     */
 +    private static Map mergeWithAddPair(Map m1, Map m2) {
 +        Map ret = new HashMap();
 +
 +        Set<Object> allKeys = new HashSet<>();
 +        if (m1 != null) {
 +            allKeys.addAll(m1.keySet());
 +        }
 +        if (m2 != null) {
 +            allKeys.addAll(m2.keySet());
 +        }
 +
 +        for (Object k : allKeys) {
 +            Map mm1 = (m1 != null) ? (Map) m1.get(k) : null;
 +            Map mm2 = (m2 != null) ? (Map) m2.get(k) : null;
 +            if (mm1 == null && mm2 == null) {
 +                continue;
 +            } else if (mm1 == null) {
 +                ret.put(k, mm2);
 +            } else if (mm2 == null) {
 +                ret.put(k, mm1);
 +            } else {
 +                Map tmp = new HashMap();
 +                for (Object kk : mm1.keySet()) {
 +                    List seq1 = (List) mm1.get(kk);
 +                    List seq2 = (List) mm2.get(kk);
 +                    List sums = new ArrayList();
 +                    for (int i = 0; i < seq1.size(); i++) {
 +                        sums.add(add((Number) seq1.get(i), (Number) 
seq2.get(i)));
 +                    }
 +                    tmp.put(kk, sums);
 +                }
 +                ret.put(k, t

<TRUNCATED>

Reply via email to