Repository: apex-core Updated Branches: refs/heads/master 7103e683f -> ffab17ac6
APEXCORE-595: Don't update committedWindowId when all partitions are removed. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/b5c8e4be Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/b5c8e4be Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/b5c8e4be Branch: refs/heads/master Commit: b5c8e4bee839018867738f11ffe18ffa26b695d7 Parents: 3f06ce7 Author: Tushar R. Gosavi <[email protected]> Authored: Tue Jan 3 15:38:45 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Tue Jan 3 15:38:45 2017 +0530 ---------------------------------------------------------------------- .../com/datatorrent/stram/StreamingContainerManager.java | 8 ++++++++ 1 file changed, 8 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/b5c8e4be/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 8da1ed8..45bfcdb 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2188,16 +2188,24 @@ public class StreamingContainerManager implements PlanContext */ private long updateCheckpoints(boolean recovery) { + int operatorCount = 0; UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups()); for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) { //LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName()); List<PTOperator> operators = plan.getOperators(logicalOperator); if (operators != null) { for (PTOperator operator : operators) { + operatorCount++; updateRecoveryCheckpoints(operator, ctx); } } } + + // if no physical operators are available, then don't update committedWindowId + if (operatorCount == 0) { + return committedWindowId; + } + purgeCheckpoints(); for (PTOperator oper : ctx.blocked) {
