Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2881#discussion_r226693412
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
---
@@ -348,14 +350,27 @@ public void doExecutorHeartbeats() {
if (null == executors) {
stats =
ClientStatsUtil.mkEmptyExecutorZkHbs(workerState.localExecutors);
} else {
- stats =
ClientStatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors
-
.toMap(IRunningExecutor::getExecutorId,
-
IRunningExecutor::renderStats)));
+ stats = ClientStatsUtil.convertExecutorZkHbs(
+ executors.stream().collect(
+ Collectors.toMap(
+ IRunningExecutor::getExecutorId,
+ IRunningExecutor::renderStats
+ )
+ )
+ );
}
- Map<String, Object> zkHB =
ClientStatsUtil.mkZkWorkerHb(workerState.topologyId, stats,
workerState.uptime.upTime());
+
+ Map<String, Object> zkHB = ClientStatsUtil.mkZkWorkerHb(
+ workerState.topologyId, stats, workerState.uptime.upTime()
+ );
+
try {
+ String assignmentId = workerState.assignmentId;
+ if (this.numaId != null) {
+ assignmentId += Constants.NUMA_ID_SEPARATOR + this.numaId;
--- End diff --
What would happen if we didn't add in the numaId to the heartbeat? This is
because we are not going to be able to guarantee during an upgrade that the
worker will be up to date with the latest code on nimbus.
---