Github user srdo commented on the issue:
https://github.com/apache/storm/pull/2800
I'm sorry if this turns a bit verbose, but I'm going to write down what I
see as the issue here, so we can hopefully come to a common understanding (and
so I don't forget and have to look at this again)
As far as I can tell, the uses of `heartbeatsCache` in Nimbus are thread
safe, because the values are never modified, just overwritten. That is, we
don't do `heartbeatsCache.get(topoId).put(foo, bar)`, instead we do
`heartbeatsCache.getAndUpdate(func)`, which replaces the value entirely. I
don't believe we need further synchronization here, since the `AtomicReference`
ensures that the value changes are propagated to all threads, and two threads
reading from an effectively immutable map at the same time should be fine(?)
However in the `updateHeartbeatCache` method in StatsUtil
https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L1565
we take one of the values from `heartbeatsCache` and modify it.
There are a couple of problems here. First, the `cache` value is a regular
`HashMap` and not a `ConcurrentHashMap`, so modifying it from two threads at
once isn't safe. Second, in the branch in `updateHeartbeatCache` where
`executorBeats` is null, we iterate over the `cache` parameter. If one thread
is in the iteration, and another thread is in the other branch in
`updateHeartbeatCache`, we get the exception.
The reason this exception isn't thrown in a real cluster is that the
`executorBeats` parameter is only null when called from
https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java#L1681
This only happens when Nimbus is booting up as part of `launchServer`, or
when someone triggers a rebalance in the topology. We see it in the tests,
because Nimbus and the supervisors are started concurrently, so Nimbus can be
in one branch in `StatsUtil.updateHeartbeatCache` while one of the supervisors
is in the other branch. It can technically happen in a real cluster, but
someone would have to get extremely unlucky with rebalance timing.
I think the fix here should be making sure that
`StatsUtil.updateHeartbeatCache` is thread safe. One option is to make the
`cache` value a `ConcurrentHashMap`. Another option would be to make
`updateHeartbeatCache` create and return a new map, instead of modifying the
existing one.
---