Fix for review comment.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/064edf08 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/064edf08 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/064edf08 Branch: refs/heads/devel-3 Commit: 064edf08447d88ad9147bfab460705dfbaf4b4f3 Parents: d19fa66 Author: thomas <[email protected]> Authored: Fri Aug 21 12:43:33 2015 -0700 Committer: thomas <[email protected]> Committed: Fri Aug 21 12:43:33 2015 -0700 ---------------------------------------------------------------------- .../com/datatorrent/stram/StreamingContainerManager.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/064edf08/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index eed2948..7002c1d 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -1004,7 +1004,7 @@ public class StreamingContainerManager implements PlanContext } reportStats.remove(o); } - + if (!this.shutdownOperators.isEmpty()) { synchronized (this.shutdownOperators) { Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator(); @@ -1023,7 +1023,7 @@ public class StreamingContainerManager implements PlanContext } } } - + if (!eventQueue.isEmpty()) { for (PTOperator oper : plan.getAllOperators().values()) { if (oper.getState() != PTOperator.State.ACTIVE) { @@ -1297,7 +1297,7 @@ public class StreamingContainerManager implements PlanContext case SHUTDOWN: // schedule operator deactivation against the windowId // will be processed once window is committed and all dependent operators completed processing - long windowId = oper.stats.currentWindowId.get(); + long windowId = oper.stats.currentWindowId.get(); if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) { windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId; } @@ -1305,8 +1305,9 @@ public class StreamingContainerManager implements PlanContext synchronized (this.shutdownOperators) { Set<PTOperator> deactivatedOpers = this.shutdownOperators.get(windowId); if (deactivatedOpers == null) { - this.shutdownOperators.put(windowId, deactivatedOpers = Sets.newHashSet(oper)); + this.shutdownOperators.put(windowId, deactivatedOpers = new HashSet<>()); } + deactivatedOpers.add(oper); } sca.undeployOpers.add(oper.getId()); // record operator stop event @@ -2264,7 +2265,7 @@ public class StreamingContainerManager implements PlanContext oi.currentWindowId = toWsWindowId(os.currentWindowId.get()); if (os.lastHeartbeat != null) { oi.lastHeartbeat = os.lastHeartbeat.getGeneratedTms(); - } + } if (os.checkpointStats != null) { oi.checkpointTime = os.checkpointStats.checkpointTime; oi.checkpointStartTime = os.checkpointStats.checkpointStartTime;
