STORM-3162: Cleanup heartbeats cache and make it thread safe
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9d3feb0e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9d3feb0e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9d3feb0e Branch: refs/heads/master Commit: 9d3feb0e18a96a90cdce9c2ff1727c9c9ecca4a1 Parents: eaed3cb Author: Robert (Bobby) Evans <ev...@yahoo-inc.com> Authored: Fri Sep 14 14:48:33 2018 -0500 Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com> Committed: Mon Sep 17 14:56:56 2018 -0500 ---------------------------------------------------------------------- storm-client/pom.xml | 2 +- .../org/apache/storm/daemon/worker/Worker.java | 10 +- .../jvm/org/apache/storm/executor/Executor.java | 8 +- .../storm/executor/bolt/BoltExecutor.java | 4 +- .../storm/executor/spout/SpoutExecutor.java | 4 +- .../apache/storm/stats/BoltExecutorStats.java | 10 +- .../org/apache/storm/stats/ClientStatsUtil.java | 202 ++ .../jvm/org/apache/storm/stats/StatsUtil.java | 2610 ------------------ .../test/clj/org/apache/storm/nimbus_test.clj | 22 +- storm-server/pom.xml | 2 +- .../storm/daemon/nimbus/HeartbeatCache.java | 229 ++ .../org/apache/storm/daemon/nimbus/Nimbus.java | 87 +- .../java/org/apache/storm/stats/StatsUtil.java | 2388 ++++++++++++++++ 13 files changed, 2866 insertions(+), 2712 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/pom.xml ---------------------------------------------------------------------- diff --git a/storm-client/pom.xml b/storm-client/pom.xml index 1a120d8..2e60d19 100644 --- a/storm-client/pom.xml +++ b/storm-client/pom.xml @@ -164,7 +164,7 @@ <!--Note - the version would be inherited--> <configuration> <excludes>**/generated/**</excludes> - <maxAllowedViolations>3285</maxAllowedViolations> + <maxAllowedViolations>3067</maxAllowedViolations> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index d02de9b..048425e 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -55,7 +55,7 @@ import org.apache.storm.shade.com.google.common.base.Preconditions; import org.apache.storm.shade.org.apache.commons.io.FileUtils; import org.apache.storm.shade.org.apache.commons.lang.ObjectUtils; import org.apache.storm.shade.uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; -import org.apache.storm.stats.StatsUtil; +import org.apache.storm.stats.ClientStatsUtil; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.NimbusClient; @@ -346,17 +346,17 @@ public class Worker implements Shutdownable, DaemonCommon { Map<List<Integer>, ExecutorStats> stats; List<IRunningExecutor> executors = this.executorsAtom.get(); if (null == executors) { - stats = StatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors); + stats = ClientStatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors); } else { - stats = StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors + stats = ClientStatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors .toMap(IRunningExecutor::getExecutorId, IRunningExecutor::renderStats))); } - Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime()); + Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(workerState.topologyId, stats, workerState.uptime.upTime()); try { workerState.stormClusterState .workerHeartbeat(workerState.topologyId, workerState.assignmentId, (long) workerState.port, - StatsUtil.thriftifyZkWorkerHb(zkHB)); + ClientStatsUtil.thriftifyZkWorkerHb(zkHB)); } catch (Exception ex) { LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", ex); } http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/Executor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java index e3de2e1..2352eec 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java @@ -60,8 +60,8 @@ import org.apache.storm.shade.com.google.common.collect.Lists; import org.apache.storm.shade.org.jctools.queues.MpscChunkedArrayQueue; import org.apache.storm.shade.org.json.simple.JSONValue; import org.apache.storm.shade.org.json.simple.parser.ParseException; +import org.apache.storm.stats.ClientStatsUtil; import org.apache.storm.stats.CommonStats; -import org.apache.storm.stats.StatsUtil; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.Fields; @@ -177,7 +177,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer { String componentId = workerTopologyContext.getComponentId(taskIds.get(0)); String type = getExecutorType(workerTopologyContext, componentId); - if (StatsUtil.SPOUT.equals(type)) { + if (ClientStatsUtil.SPOUT.equals(type)) { executor = new SpoutExecutor(workerState, executorId, credentials); } else { executor = new BoltExecutor(workerState, executorId, credentials); @@ -205,9 +205,9 @@ public abstract class Executor implements Callable, JCQueue.Consumer { Map<String, SpoutSpec> spouts = topology.get_spouts(); Map<String, Bolt> bolts = topology.get_bolts(); if (spouts.containsKey(componentId)) { - return StatsUtil.SPOUT; + return ClientStatsUtil.SPOUT; } else if (bolts.containsKey(componentId)) { - return StatsUtil.BOLT; + return ClientStatsUtil.BOLT; } else { throw new RuntimeException("Could not find " + componentId + " in " + topology); } http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java index 1008eba..48d39c6 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java @@ -38,7 +38,7 @@ import org.apache.storm.policy.WaitStrategyPark; import org.apache.storm.security.auth.IAutoCredentials; import org.apache.storm.shade.com.google.common.collect.ImmutableMap; import org.apache.storm.stats.BoltExecutorStats; -import org.apache.storm.stats.StatsUtil; +import org.apache.storm.stats.ClientStatsUtil; import org.apache.storm.task.IBolt; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -68,7 +68,7 @@ public class BoltExecutor extends Executor { private BoltOutputCollectorImpl outputCollector; public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) { - super(workerData, executorId, credentials, StatsUtil.BOLT); + super(workerData, executorId, credentials, ClientStatsUtil.BOLT); this.executeSampler = ConfigUtils.mkStatsSampler(topoConf); this.isSystemBoltExecutor = (executorId == Constants.SYSTEM_EXECUTOR_ID); if (isSystemBoltExecutor) { http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java index cd4bb09..c46a7b9 100644 --- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java @@ -37,8 +37,8 @@ import org.apache.storm.policy.IWaitStrategy.WAIT_SITUATION; import org.apache.storm.shade.com.google.common.collect.ImmutableMap; import org.apache.storm.spout.ISpout; import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.ClientStatsUtil; import org.apache.storm.stats.SpoutExecutorStats; -import org.apache.storm.stats.StatsUtil; import org.apache.storm.tuple.AddressedTuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.utils.ConfigUtils; @@ -73,7 +73,7 @@ public class SpoutExecutor extends Executor { private long threadId = 0; public SpoutExecutor(final WorkerState workerData, final List<Long> executorId, Map<String, String> credentials) { - super(workerData, executorId, credentials, StatsUtil.SPOUT); + super(workerData, executorId, credentials, ClientStatsUtil.SPOUT); this.spoutWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); this.spoutWaitStrategy.prepare(topoConf, WAIT_SITUATION.SPOUT_WAIT); this.backPressureWaitStrategy = ReflectionUtils.newInstance((String) topoConf.get(Config.TOPOLOGY_BACKPRESSURE_WAIT_STRATEGY)); http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java index 6fea28a..26b2d74 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java +++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java @@ -83,11 +83,11 @@ public class BoltExecutorStats extends CommonStats { // bolt stats BoltStats boltStats = new BoltStats( - StatsUtil.windowSetConverter(valueStat(getAcked()), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(getFailed()), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(processLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(executedStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY), - StatsUtil.windowSetConverter(valueStat(executeLatencyStats), StatsUtil.TO_GSID, StatsUtil.IDENTITY)); + ClientStatsUtil.windowSetConverter(valueStat(getAcked()), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY), + ClientStatsUtil.windowSetConverter(valueStat(getFailed()), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY), + ClientStatsUtil.windowSetConverter(valueStat(processLatencyStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY), + ClientStatsUtil.windowSetConverter(valueStat(executedStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY), + ClientStatsUtil.windowSetConverter(valueStat(executeLatencyStats), ClientStatsUtil.TO_GSID, ClientStatsUtil.IDENTITY)); ret.set_specific(ExecutorSpecificStats.bolt(boltStats)); return ret; http://git-wip-us.apache.org/repos/asf/storm/blob/9d3feb0e/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java new file mode 100644 index 0000000..3f63db3 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.stats; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.generated.ClusterWorkerHeartbeat; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.shade.com.google.common.collect.Lists; +import org.apache.storm.utils.Time; + +/** + * Stats calculations needed by storm client code. + */ +public class ClientStatsUtil { + public static final String SPOUT = "spout"; + public static final String BOLT = "bolt"; + static final String EXECUTOR_STATS = "executor-stats"; + static final String UPTIME = "uptime"; + public static final String TIME_SECS = "time-secs"; + public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer(); + public static final IdentityTransformer IDENTITY = new IdentityTransformer(); + + /** + * Convert a List<Long> executor to java List<Integer>. + */ + public static List<Integer> convertExecutor(List<Long> executor) { + return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue()); + } + + /** + * Make and map of executors to empty stats. + * @param executors the executors as keys of the map. + * @return and empty map of executors to stats. + */ + public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) { + Map<List<Integer>, ExecutorStats> ret = new HashMap<>(); + for (Object executor : executors) { + List startEnd = (List) executor; + ret.put(convertExecutor(startEnd), null); + } + return ret; + } + + /** + * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps. + */ + public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) { + Map<List<Integer>, ExecutorStats> ret = new HashMap<>(); + for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) { + ret.put(convertExecutor(entry.getKey()), entry.getValue()); + } + return ret; + } + + /** + * Create a new worker heartbeat for zookeeper. + * @param topoId the topology id + * @param executorStats the stats for the executors + * @param uptime the uptime for the worker. + * @return the heartbeat map. + */ + public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) { + Map<String, Object> ret = new HashMap<>(); + ret.put("storm-id", topoId); + ret.put(EXECUTOR_STATS, executorStats); + ret.put(UPTIME, uptime); + ret.put(TIME_SECS, Time.currentTimeSecs()); + + return ret; + } + + private static Number getByKeyOr0(Map<String, Object> m, String k) { + if (m == null) { + return 0; + } + + Number n = (Number) m.get(k); + if (n == null) { + return 0; + } + return n; + } + + /** + * Get a sub-map by a given key. + * @param map the original map + * @param key the key to get it from. + * @return the map stored under key. + */ + public static <K, V> Map<K, V> getMapByKey(Map map, String key) { + if (map == null) { + return null; + } + return (Map<K, V>) map.get(key); + } + + public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) { + ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat(); + ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue()); + ret.set_storm_id((String) heartbeat.get("storm-id")); + ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue()); + + Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>(); + + Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS); + if (executorStats != null) { + for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) { + List<Integer> executor = entry.getKey(); + ExecutorStats stats = entry.getValue(); + if (null != stats) { + convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats); + } + } + } + ret.set_executor_stats(convertedStats); + + return ret; + } + + /** + * Converts stats to be over given windows of time. + * @param stats the stats + * @param secKeyFunc transform the sub-key + * @param firstKeyFunc transform the main key + */ + public static <K1, K2> Map windowSetConverter( + Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) { + Map ret = new HashMap(); + + for (Object o : stats.entrySet()) { + Map.Entry entry = (Map.Entry) o; + K1 key1 = firstKeyFunc.transform(entry.getKey()); + + Map subRetMap = (Map) ret.get(key1); + if (subRetMap == null) { + subRetMap = new HashMap(); + } + ret.put(key1, subRetMap); + + Map value = (Map) entry.getValue(); + for (Object oo : value.entrySet()) { + Map.Entry subEntry = (Map.Entry) oo; + K2 key2 = secKeyFunc.transform(subEntry.getKey()); + subRetMap.put(key2, subEntry.getValue()); + } + } + return ret; + } + + // ===================================================================================== + // key transformers + // ===================================================================================== + + /** + * Provides a way to transform one key into another. + * @param <T> + */ + interface KeyTransformer<T> { + T transform(Object key); + } + + static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> { + @Override + public GlobalStreamId transform(Object key) { + if (key instanceof List) { + List l = (List) key; + if (l.size() > 1) { + return new GlobalStreamId((String) l.get(0), (String) l.get(1)); + } + } + return new GlobalStreamId("", key.toString()); + } + } + + static class IdentityTransformer implements KeyTransformer<Object> { + @Override + public Object transform(Object key) { + return key; + } + } +}