Repository: incubator-apex-core Updated Branches: refs/heads/release-3.2 75b716801 -> 8b4699d22
APEXCORE-362 - NPE in StreamingContainerManager. Fixed race condition between the thread that insert into endWindowStatsOperatorMap and the thread that removes entries when endWindowStatsOperatorMap exceeds 1000 entries. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0649255a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0649255a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0649255a Branch: refs/heads/release-3.2 Commit: 0649255ac69ca368e58d42d14de1602c2c13e674 Parents: be34b5e Author: Vlad Rozov <[email protected]> Authored: Sun Feb 28 09:07:29 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Sun Feb 28 09:07:29 2016 -0800 ---------------------------------------------------------------------- .../com/datatorrent/stram/StreamingContainerManager.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0649255a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 29c6a2c..1c17987 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1684,8 +1684,12 @@ public class StreamingContainerManager implements PlanContext if (stats.windowId > currentEndWindowStatsWindowId) { Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(stats.windowId); if (endWindowStatsMap == null) { - endWindowStatsOperatorMap.putIfAbsent(stats.windowId, new ConcurrentSkipListMap<Integer, EndWindowStats>()); - endWindowStatsMap = endWindowStatsOperatorMap.get(stats.windowId); + endWindowStatsMap = new ConcurrentSkipListMap<Integer, EndWindowStats>(); + Map<Integer, EndWindowStats> endWindowStatsMapPrevious = + endWindowStatsOperatorMap.putIfAbsent(stats.windowId, endWindowStatsMap); + if (endWindowStatsMapPrevious != null) { + endWindowStatsMap = endWindowStatsMapPrevious; + } } endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
