Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2800#discussion_r212800719
--- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
@@ -1525,27 +1528,24 @@ public static ComponentPageInfo aggCompExecsStats(
* @param timeout timeout
* @return a HashMap of updated executor heart beats
*/
- public static Map<List<Integer>, Map<String, Object>>
updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>>
cache,
-
Map<List<Integer>, Map<String, Object>>
-
executorBeats,
-
Set<List<Integer>> executors,
-
Integer timeout) {
- Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
- if (cache == null && executorBeats == null) {
- return ret;
- }
-
+ public static ConcurrentMap<List<Integer>, Map<String, Object>>
updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>>
cache,
+
Map<List<Integer>, Map<String, Object>>
+
executorBeats,
+
Set<List<Integer>> executors,
+
Integer timeout) {
if (cache == null) {
+ if (executorBeats == null) {
--- End diff --
Sorry, i did see any reason why this method is not thread safe, cause it
almost a tool method, only to initialize a Map cache which is updated into
`Nimbus` heartbeatsCache through `heartbeatsCache.getAndUpdate(new
Assoc<>(topoId, cache))`,` ConcurrentModificationException` happens when we
iterate over a collection through iterator and also modify it, but here, we
only iterate the executor list and do not modify any of the list entry.
---