Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    I'm sorry if this turns a bit verbose, but I'm going to write down what I 
see as the issue here, so we can hopefully come to a common understanding (and 
so I don't forget and have to look at this again)
    
    As far as I can tell, the uses of `heartbeatsCache` in Nimbus are thread 
safe, because the values are never modified, just overwritten. That is, we 
don't do `heartbeatsCache.get(topoId).put(foo, bar)`, instead we do 
`heartbeatsCache.getAndUpdate(func)`, which replaces the value entirely. I 
don't believe we need further synchronization here, since the `AtomicReference` 
ensures that the value changes are propagated to all threads, and two threads 
reading from an effectively immutable map at the same time should be fine(?)
    
    However in the `updateHeartbeatCache` method in StatsUtil 
https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L1565
 we take one of the values from `heartbeatsCache` and modify it.
    
    There are a couple of problems here. First, the `cache` value is a regular 
`HashMap` and not a `ConcurrentHashMap`, so modifying it from two threads at 
once isn't safe. Second, in the branch in `updateHeartbeatCache` where 
`executorBeats` is null, we iterate over the `cache` parameter. If one thread 
is in the iteration, and another thread is in the other branch in 
`updateHeartbeatCache`, we get the exception.
    
    The reason this exception isn't thrown in a real cluster is that the 
`executorBeats` parameter is only null when called from 
https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java#L1681
    
    This only happens when Nimbus is booting up as part of `launchServer`, or 
when someone triggers a rebalance in the topology. We see it in the tests, 
because Nimbus and the supervisors are started concurrently, so Nimbus can be 
in one branch in `StatsUtil.updateHeartbeatCache` while one of the supervisors 
is in the other branch. It can technically happen in a real cluster, but 
someone would have to get extremely unlucky with rebalance timing.
    
    I think the fix here should be making sure that 
`StatsUtil.updateHeartbeatCache` is thread safe. One option is to make the 
`cache` value a `ConcurrentHashMap`. Another option would be to make 
`updateHeartbeatCache` create and return a new map, instead of modifying the 
existing one.


---

Reply via email to