Repository: apex-core Updated Branches: refs/heads/master 58930cc57 -> 3b660c9c1
APEXCORE-426 Reuse the running container, when the Stram restarts. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3b660c9c Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3b660c9c Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3b660c9c Branch: refs/heads/master Commit: 3b660c9c1dd639c49ae300fd3f70397d3f794d12 Parents: 58930cc Author: Sandesh Hegde <[email protected]> Authored: Fri Nov 4 17:31:59 2016 -0700 Committer: Sandesh Hegde <[email protected]> Committed: Thu Mar 2 13:04:33 2017 -0800 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/StramClient.java | 2 + .../datatorrent/stram/StramLocalCluster.java | 2 +- .../stram/StreamingAppMasterService.java | 38 +++++++------------ .../stram/StreamingContainerManager.java | 16 ++++---- .../com/datatorrent/stram/CheckpointTest.java | 39 +++++++++----------- .../stram/StramLocalClusterTest.java | 2 +- .../datatorrent/stram/StramRecoveryTest.java | 2 +- .../stram/StreamingContainerManagerTest.java | 8 ++-- .../stram/plan/logical/DelayOperatorTest.java | 2 +- 9 files changed, 50 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 8b78c14..dad42e3 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -392,6 +392,8 @@ public class StramClient //appContext.setMaxAppAttempts(1); // no retries until Stram is HA } + appContext.setKeepContainersAcrossApplicationAttempts(true); + // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index e5d855b..ff61868 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -501,7 +501,7 @@ public class StramLocalCluster implements Runnable, Controller if (heartbeatMonitoringEnabled) { // monitor child containers - dnmgr.monitorHeartbeat(); + dnmgr.monitorHeartbeat(false); } if (childContainers.isEmpty() && dnmgr.containerStartRequests.isEmpty()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 3898dbc..c0e09ab 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -27,7 +27,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -63,9 +62,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -75,7 +71,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.webapp.WebApp; @@ -740,9 +735,12 @@ public class StreamingAppMasterService extends CompositeService clientRMService.stop(); } - // check for previously allocated containers - // as of 2.2, containers won't survive AM restart, but this will change in the future - YARN-1490 - checkContainerStatus(); + List<Container> containers = response.getContainersFromPreviousAttempts(); + + // Running containers might take a while to register with the new app master and send the heartbeat signal. + int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0; + + previouslyAllocatedContainers(containers); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, @@ -1028,7 +1026,9 @@ public class StreamingAppMasterService extends CompositeService loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests); // monitor child containers - dnmgr.monitorHeartbeat(); + dnmgr.monitorHeartbeat(waitForRecovery > 0); + + waitForRecovery = Math.max(waitForRecovery - 1, 0); } finishApplication(finalStatus); @@ -1068,22 +1068,12 @@ public class StreamingAppMasterService extends CompositeService * Check for containers that were allocated in a previous attempt. * If the containers are still alive, wait for them to check in via heartbeat. */ - private void checkContainerStatus() + private void previouslyAllocatedContainers(List<Container> containers) { - Collection<StreamingContainerAgent> containers = this.dnmgr.getContainerAgents(); - for (StreamingContainerAgent ca : containers) { - ContainerId containerId = ConverterUtils.toContainerId(ca.container.getExternalId()); - NodeId nodeId = ConverterUtils.toNodeId(ca.container.host); - - // put container back into the allocated list - org.apache.hadoop.yarn.api.records.Token containerToken = null; - Resource resource = Resource.newInstance(ca.container.getAllocatedMemoryMB(), ca.container.getAllocatedVCores()); - Priority priority = Priority.newInstance(ca.container.getResourceRequestPriority()); - Container yarnContainer = Container.newInstance(containerId, nodeId, ca.container.nodeHttpAddress, resource, priority, containerToken); - this.allocatedContainers.put(containerId.toString(), new AllocatedContainer(yarnContainer)); - - // check the status - nmClient.getContainerStatusAsync(containerId, nodeId); + for (Container container : containers) { + this.allocatedContainers.put(container.getId().toString(), new AllocatedContainer(container)); + //check the status + nmClient.getContainerStatusAsync(container.getId(), container.getNodeId()); } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index dfbc7d1..fe27be9 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -749,7 +749,7 @@ public class StreamingContainerManager implements PlanContext * Check periodically that deployed containers phone home. * Run from the master main loop (single threaded access). */ - public void monitorHeartbeat() + public void monitorHeartbeat(boolean waitForRecovery) { long currentTms = clock.getTime(); @@ -798,7 +798,7 @@ public class StreamingContainerManager implements PlanContext // events that may modify the plan processEvents(); - committedWindowId = updateCheckpoints(false); + committedWindowId = updateCheckpoints(waitForRecovery); calculateEndWindowStats(); if (this.vars.enableStatsRecording) { recordStats(currentTms); @@ -1138,7 +1138,7 @@ public class StreamingContainerManager implements PlanContext // resolve dependencies UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups()); for (PTOperator oper : cs.container.getOperators()) { - updateRecoveryCheckpoints(oper, ctx); + updateRecoveryCheckpoints(oper, ctx, false); } includeLocalUpstreamOperators(ctx); @@ -1170,7 +1170,7 @@ public class StreamingContainerManager implements PlanContext } if (!newOperators.isEmpty()) { for (PTOperator oper : newOperators) { - updateRecoveryCheckpoints(oper, ctx); + updateRecoveryCheckpoints(oper, ctx, false); } } } while (!newOperators.isEmpty()); @@ -2022,7 +2022,7 @@ public class StreamingContainerManager implements PlanContext * @param operator Operator instance for which to find recovery checkpoint * @param ctx Context into which to collect traversal info */ - public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsContext ctx) + public void updateRecoveryCheckpoints(PTOperator operator, UpdateCheckpointsContext ctx, boolean recovery) { if (operator.getRecoveryCheckpoint().windowId < ctx.committedWindowId.longValue()) { ctx.committedWindowId.setValue(operator.getRecoveryCheckpoint().windowId); @@ -2031,7 +2031,7 @@ public class StreamingContainerManager implements PlanContext if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) { // if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once) - if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) { + if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId && !recovery) { LOG.warn("Marking operator {} blocked committed window {}, recovery window {}, current time {}, last window id change time {}, window processing timeout millis {}", operator, Codec.getStringWindowId(ctx.committedWindowId.longValue()), @@ -2096,7 +2096,7 @@ public class StreamingContainerManager implements PlanContext } if (!ctx.visited.contains(sinkOperator)) { // downstream traversal - updateRecoveryCheckpoints(sinkOperator, ctx); + updateRecoveryCheckpoints(sinkOperator, ctx, recovery); } // recovery window id cannot move backwards // when dynamically adding new operators @@ -2196,7 +2196,7 @@ public class StreamingContainerManager implements PlanContext if (operators != null) { for (PTOperator operator : operators) { operatorCount++; - updateRecoveryCheckpoints(operator, ctx); + updateRecoveryCheckpoints(operator, ctx, recovery); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index db939cd..d7f96d4 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -216,11 +216,11 @@ public class CheckpointTest Assert.assertEquals("", PTOperator.State.PENDING_DEPLOY, oper.getState()); } - dnm.updateRecoveryCheckpoints(o2p1, new UpdateCheckpointsContext(clock)); + dnm.updateRecoveryCheckpoints(o2p1, new UpdateCheckpointsContext(clock), false); Assert.assertEquals("no checkpoints " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint()); UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertEquals("no checkpoints " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("number dependencies " + ctx.visited, 3, ctx.visited.size()); @@ -231,17 +231,17 @@ public class CheckpointTest o1p1.checkpoints.add(cp3); o1p1.checkpoints.add(cp5); - dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock)); + dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false); Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint()); o2p1.checkpoints.add(new Checkpoint(3L, 0, 0)); - dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock)); + dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false); Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("checkpoint " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint()); // set leaf operator checkpoint dnm.addCheckpoint(o3SLp1, cp5); - dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock)); + dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false); Assert.assertEquals("checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("checkpoint " + o2p1, Checkpoint.INITIAL_CHECKPOINT, o2p1.getRecoveryCheckpoint()); @@ -249,20 +249,20 @@ public class CheckpointTest for (PTOperator oper : plan.getAllOperators().values()) { oper.setState(PTOperator.State.ACTIVE); } - dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock)); + dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false); Assert.assertEquals("checkpoint " + o1p1, cp3, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("checkpoint " + o2p1, cp3, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("checkpoint " + o3SLp1, cp5, o3SLp1.getRecoveryCheckpoint()); Assert.assertNull("checkpoint null for stateless operator " + o3SLp1, o3SLp1.stats.checkpointStats); o2p1.checkpoints.add(cp4); - dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock)); + dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false); Assert.assertEquals("checkpoint " + o1p1, cp3, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("checkpoint " + o2p1, cp4, o2p1.getRecoveryCheckpoint()); o1p1.checkpoints.add(1, cp4); Assert.assertEquals(o1p1.checkpoints, getCheckpoints(3L, 4L, 5L)); - dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock)); + dnm.updateRecoveryCheckpoints(o1p1, new UpdateCheckpointsContext(clock), false); Assert.assertEquals("checkpoint " + o1p1, cp4, o1p1.getRecoveryCheckpoint()); Assert.assertEquals(o1p1.checkpoints, getCheckpoints(4L, 5L)); @@ -315,7 +315,7 @@ public class CheckpointTest o4p1.checkpoints.add(leafCheckpoint); UpdateCheckpointsContext ctx; - dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap())); + dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()), false); Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint()); Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint()); Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint()); @@ -376,7 +376,7 @@ public class CheckpointTest PTOperator o2p1 = partitions.get(0); UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertTrue("no blocked operators", ctx.blocked.isEmpty()); o1p1.stats.statsRevs.checkout(); @@ -387,25 +387,25 @@ public class CheckpointTest clock.time = o1p1.stats.windowProcessingTimeoutMillis + 1; ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertEquals("o2 blocked", Sets.newHashSet(o2p1), ctx.blocked); // assign future activation window (state-less or at-most-once). Checkpoint cp2 = o2p1.getRecoveryCheckpoint(); o2p1.setRecoveryCheckpoint(new Checkpoint(o1p1.getRecoveryCheckpoint().windowId + 1, cp2.applicationWindowCount, cp2.checkpointWindowCount)); ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertEquals("no operators blocked (o2 activation window ahead)", Sets.newHashSet(), ctx.blocked); // reset to blocked o2p1.setRecoveryCheckpoint(cp2); ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertEquals("o2 blocked", Sets.newHashSet(o2p1), ctx.blocked); clock.time++; ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertEquals("operators blocked", Sets.newHashSet(o1p1, o2p1), ctx.blocked); o2p1.stats.statsRevs.checkout(); @@ -413,16 +413,13 @@ public class CheckpointTest o2p1.stats.statsRevs.commit(); ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertEquals("operators blocked", Sets.newHashSet(o1p1), ctx.blocked); clock.time--; ctx = new UpdateCheckpointsContext(clock); - dnm.updateRecoveryCheckpoints(o1p1, ctx); + dnm.updateRecoveryCheckpoints(o1p1, ctx, false); Assert.assertEquals("operators blocked", Sets.newHashSet(), ctx.blocked); - - - } @Test @@ -473,13 +470,13 @@ public class CheckpointTest mc1.sendHeartbeat(); Assert.assertEquals(PTOperator.State.ACTIVE, o1p1.getState()); Assert.assertEquals(10, o1p1.stats.lastWindowIdChangeTms); - scm.monitorHeartbeat(); + scm.monitorHeartbeat(false); Assert.assertTrue(scm.containerStopRequests.isEmpty()); clock.time++; mc1.sendHeartbeat(); Assert.assertEquals(PTOperator.State.ACTIVE, o1p1.getState()); - scm.monitorHeartbeat(); + scm.monitorHeartbeat(false); Assert.assertTrue(scm.containerStopRequests.containsKey(o1p1.getContainer().getExternalId())); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java index 5bea0b3..56641f8 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java @@ -280,7 +280,7 @@ public class StramLocalClusterTest c2.waitForHeartbeat(5000); // purge checkpoints - localCluster.dnmgr.monitorHeartbeat(); // checkpoint purging + localCluster.dnmgr.monitorHeartbeat(false); // checkpoint purging Assert.assertEquals("checkpoints " + ptNode1, Arrays.asList(new Checkpoint[] {new Checkpoint(3L, 0, 0)}), ptNode1.checkpoints); Assert.assertEquals("checkpoints " + ptNode2, Arrays.asList(new Checkpoint[] {new Checkpoint(3L, 0, 0)}), ptNode2.checkpoints); http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index e8ec26c..3c25096 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -241,7 +241,7 @@ public class StramRecoveryTest csr.setSinkOperatorPortName("inport1"); FutureTask<?> lpmf = scm.logicalPlanModification(Lists.newArrayList(cor, csr)); while (!lpmf.isDone()) { - scm.monitorHeartbeat(); + scm.monitorHeartbeat(false); } Assert.assertNull(lpmf.get()); // unmask exception, if any http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 2d86618..84622c4 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -769,9 +769,9 @@ public class StreamingContainerManagerTest o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN); mc1.sendHeartbeat(); - scm.monitorHeartbeat(); + scm.monitorHeartbeat(false); Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId()); - scm.monitorHeartbeat(); // committedWindowId updated in next cycle + scm.monitorHeartbeat(false); // committedWindowId updated in next cycle Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId()); scm.processEvents(); Assert.assertEquals("containers at committedWindowId=1", 4, physicalPlan.getContainers().size()); @@ -779,7 +779,7 @@ public class StreamingContainerManagerTest // checkpoint window 2 o1p1mos.checkpointWindowId(2); mc1.sendHeartbeat(); - scm.monitorHeartbeat(); + scm.monitorHeartbeat(false); Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId()); @@ -791,7 +791,7 @@ public class StreamingContainerManagerTest mc2.sendHeartbeat(); mc3.sendHeartbeat(); mc4.sendHeartbeat(); - scm.monitorHeartbeat(); + scm.monitorHeartbeat(false); // Operators are shutdown when both operators reach window Id 2 Assert.assertEquals(0, o1p1.getContainer().getOperators().size()); http://git-wip-us.apache.org/repos/asf/apex-core/blob/3b660c9c/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java index 821f4ea..285aba3 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java @@ -456,7 +456,7 @@ public class DelayOperatorTest } UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups); - scm.updateRecoveryCheckpoints(opB1, ctx); + scm.updateRecoveryCheckpoints(opB1, ctx, false); Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint()); Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());
