Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2433#discussion_r152707672
--- Diff:
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1395,10 +1374,59 @@ private void updateAllHeartbeats(Map<String,
Assignment> existingAssignments, Ma
updateHeartbeats(topoId, topologyToExecutors.get(topoId),
entry.getValue());
}
}
+
+ private void
updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat) {
+ Map<List<Integer>, Map<String, Object>> executorBeats =
StatsUtil.convertWorkerBeats(workerHeartbeat);
+ String topoId = workerHeartbeat.get_storm_id();
+ Map<List<Integer>, Map<String, Object>> cache =
heartbeatsCache.get().get(topoId);
+ if(cache == null) {
+ cache = new HashMap<>();
+ heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
+ }
+ Set<List<Integer>> executors = new HashSet<>();
+ for(ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
+ executors.add(Arrays.asList(executorInfo.get_task_start(),
executorInfo.get_task_end()));
+ }
+
+ StatsUtil.updateHeartbeatCache(heartbeatsCache.get().get(topoId),
executorBeats, executors,
+
ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)));
+
+ }
+
+ private void
updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats
workerHeartbeats) {
+
workerHeartbeats.get_worker_heartbeats().forEach(this::updateCachedHeartbeatsFromWorker);
+ if(!heartbeatsReadyFlag.get() &&
!Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) {
+
heartbeatsRecoveryStrategy.reportNodeId(workerHeartbeats.get_supervisor_id());
+ }
+ }
+
+ /**
+ * decide if the heartbeats is recovered for a master, will wait for
all the assignments nodes to recovery,
+ * every node will take care its node heartbeats reporting
+ * @return
+ */
+ private boolean isHeartbeatsRecovered() {
+ if(heartbeatsReadyFlag.get()) {
+ return true;
+ }
+ Set<String> allNodes = new HashSet<>();
+ for(Map.Entry<String, Assignment> assignmentEntry:
stormClusterState.assignmentsInfo().entrySet()) {
+
allNodes.addAll(assignmentEntry.getValue().get_node_host().keySet());
+ }
+ boolean isReady = heartbeatsRecoveryStrategy.isReady(allNodes);
+ if(isReady) {
+ heartbeatsReadyFlag.getAndSet(true);
+ }
+ return isReady;
+ }
private Set<List<Integer>> aliveExecutors(TopologyDetails td,
Set<List<Integer>> allExecutors, Assignment assignment) {
String topoId = td.getId();
Map<List<Integer>, Map<String, Object>> hbCache =
heartbeatsCache.get().get(topoId);
+ //in case that no workers report any heartbeats yet.
+ if(null == hbCache) {
--- End diff --
space between f and (
---