Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 75b716801 -> 8b4699d22


APEXCORE-362 - NPE in StreamingContainerManager. Fixed race condition between 
the thread that insert into endWindowStatsOperatorMap and the thread that 
removes entries when  endWindowStatsOperatorMap exceeds 1000 entries.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0649255a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0649255a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0649255a

Branch: refs/heads/release-3.2
Commit: 0649255ac69ca368e58d42d14de1602c2c13e674
Parents: be34b5e
Author: Vlad Rozov <[email protected]>
Authored: Sun Feb 28 09:07:29 2016 -0800
Committer: Vlad Rozov <[email protected]>
Committed: Sun Feb 28 09:07:29 2016 -0800

----------------------------------------------------------------------
 .../com/datatorrent/stram/StreamingContainerManager.java     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0649255a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 29c6a2c..1c17987 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1684,8 +1684,12 @@ public class StreamingContainerManager implements 
PlanContext
           if (stats.windowId > currentEndWindowStatsWindowId) {
             Map<Integer, EndWindowStats> endWindowStatsMap = 
endWindowStatsOperatorMap.get(stats.windowId);
             if (endWindowStatsMap == null) {
-              endWindowStatsOperatorMap.putIfAbsent(stats.windowId, new 
ConcurrentSkipListMap<Integer, EndWindowStats>());
-              endWindowStatsMap = 
endWindowStatsOperatorMap.get(stats.windowId);
+              endWindowStatsMap = new ConcurrentSkipListMap<Integer, 
EndWindowStats>();
+              Map<Integer, EndWindowStats> endWindowStatsMapPrevious =
+                  endWindowStatsOperatorMap.putIfAbsent(stats.windowId, 
endWindowStatsMap);
+              if (endWindowStatsMapPrevious != null) {
+                endWindowStatsMap = endWindowStatsMapPrevious;
+              }
             }
             endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
 

Reply via email to