Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2433#discussion_r152707672
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -1395,10 +1374,59 @@ private void updateAllHeartbeats(Map<String, 
Assignment> existingAssignments, Ma
                 updateHeartbeats(topoId, topologyToExecutors.get(topoId), 
entry.getValue());
             }
         }
    +
    +    private void 
updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat) {
    +        Map<List<Integer>, Map<String, Object>> executorBeats = 
StatsUtil.convertWorkerBeats(workerHeartbeat);
    +        String topoId = workerHeartbeat.get_storm_id();
    +        Map<List<Integer>, Map<String, Object>> cache = 
heartbeatsCache.get().get(topoId);
    +        if(cache == null) {
    +            cache = new HashMap<>();
    +            heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
    +        }
    +        Set<List<Integer>> executors = new HashSet<>();
    +        for(ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
    +            executors.add(Arrays.asList(executorInfo.get_task_start(), 
executorInfo.get_task_end()));
    +        }
    +
    +        StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId), 
executorBeats, executors,
    +                
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
    +
    +    }
    +
    +    private void 
updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats 
workerHeartbeats) {
    +        
workerHeartbeats.get_worker_heartbeats().forEach(this::updateCachedHeartbeatsFromWorker);
    +        if(!heartbeatsReadyFlag.get() && 
!Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) {
    +            
heartbeatsRecoveryStrategy.reportNodeId(workerHeartbeats.get_supervisor_id());
    +        }
    +    }
    +
    +    /**
    +     * decide if the heartbeats is recovered for a master, will wait for 
all the assignments nodes to recovery,
    +     * every node will take care its node heartbeats reporting
    +     * @return
    +     */
    +    private boolean isHeartbeatsRecovered() {
    +        if(heartbeatsReadyFlag.get()) {
    +            return true;
    +        }
    +        Set<String> allNodes = new HashSet<>();
    +        for(Map.Entry<String, Assignment> assignmentEntry: 
stormClusterState.assignmentsInfo().entrySet()) {
    +            
allNodes.addAll(assignmentEntry.getValue().get_node_host().keySet());
    +        }
    +        boolean isReady = heartbeatsRecoveryStrategy.isReady(allNodes);
    +        if(isReady) {
    +            heartbeatsReadyFlag.getAndSet(true);
    +        }
    +        return isReady;
    +    }
         
         private Set<List<Integer>> aliveExecutors(TopologyDetails td, 
Set<List<Integer>> allExecutors, Assignment assignment) {
             String topoId = td.getId();
             Map<List<Integer>, Map<String, Object>> hbCache = 
heartbeatsCache.get().get(topoId);
    +        //in case that no workers report any heartbeats yet.
    +        if(null == hbCache) {
    --- End diff --
    
    space between f and (


---

Reply via email to