Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218174414
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, 
Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, 
ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came 
in.
    +     * @param topoId the id of the topology to look at.
    +     * @param taskTimeoutSecs the timeout to know if they are too old.
    +     */
    +    public void timeoutOldHeartbeats(String topoId, Integer 
taskTimeoutSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = 
cache.computeIfAbsent(topoId, MAKE_MAP);
    +        //if not executor beats, refresh is-timed-out of the cache which 
is done by master
    +        for (ExecutorCache ec : topoCache.values()) {
    +            ec.updateTimeout(taskTimeoutSecs);
    +        }
    +    }
    +
    +    /**
    +     * Update the cache with heartbeats from a worker through zookeeper.
    +     * @param topoId the id to the topology.
    +     * @param executorBeats the HB data.
    +     * @param allExecutors the executors.
    +     * @param timeout the timeout.
    +     */
    +    public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, 
Map<String,Object>> executorBeats,
    +                                      Set<List<Integer>> allExecutors, 
Integer timeout) {
    +        Map<List<Integer>, ExecutorCache> topoCache = 
cache.computeIfAbsent(topoId, MAKE_MAP);
    +        if (executorBeats == null) {
    +            executorBeats = new HashMap<>();
    +        }
    +
    +        for (List<Integer> executor : allExecutors) {
    +            final Map<String, Object> newBeat = 
executorBeats.get(executor);
    +            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, 
(k) -> new ExecutorCache(newBeat));
    +            currBeat.updateFromHb(timeout, newBeat);
    +        }
    +    }
    +
    +    /**
    +     * Update the heartbeats for a given worker.
    +     * @param workerHeartbeat the heartbeats from the worker.
    +     * @param taskTimeoutSecs the timeout we should be looking at.
    +     */
    +    public void updateHeartbeat(SupervisorWorkerHeartbeat workerHeartbeat, 
Integer taskTimeoutSecs) {
    +        Map<List<Integer>, Map<String, Object>> executorBeats = 
StatsUtil.convertWorkerBeats(workerHeartbeat);
    +        String topoId = workerHeartbeat.get_storm_id();
    +        Map<List<Integer>, ExecutorCache> topoCache = 
cache.computeIfAbsent(topoId, MAKE_MAP);
    +
    +        for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
    +            List<Integer> executor = 
Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
    +            final Map<String, Object> newBeat = 
executorBeats.get(executor);
    +            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, 
(k) -> new ExecutorCache(newBeat));
    +            currBeat.updateFromHb(taskTimeoutSecs, newBeat);
    +        }
    +    }
    +
    +    /**
    +     * Get all of the alive executors for a given topology.
    +     * @param topoId the id of the topology we are looking for.
    +     * @param allExecutors all of the executors for this topology.
    +     * @param assignment the current topology assignment.
    +     * @param taskLaunchSecs timeout for right after a worker is launched.
    +     * @return the set of tasks that are alive.
    +     */
    +    public Set<List<Integer>> getAliveExecutors(String topoId, 
Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = 
cache.computeIfAbsent(topoId, MAKE_MAP);
    +        LOG.debug("Computing alive executors for {}\nExecutors: 
{}\nAssignment: {}\nHeartbeat cache: {}",
    +            topoId, allExecutors, assignment, topoCache);
    +
    +        Set<List<Integer>> ret = new HashSet<>();
    +        Map<List<Long>, Long> execToStartTimes = 
assignment.get_executor_start_time_secs();
    +
    +        for (List<Integer> exec : allExecutors) {
    +            List<Long> longExec = new ArrayList<>(exec.size());
    +            for (Integer num : exec) {
    +                longExec.add(num.longValue());
    +            }
    +
    +            Long startTime = execToStartTimes.get(longExec);
    +            ExecutorCache executorCache = topoCache.get(exec);
    +            //null isTimedOut means worker never reported any heartbeat
    +            Boolean isTimedOut = executorCache == null ? null : 
executorCache.isTimedOut();
    --- End diff --
    
    Yes I agree.  Will do it.  Again this was code that was already there, and 
I just refactored it, but I am happy to fix it.


---

Reply via email to