Fix test failures due to reuse of previous checkpoints.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d19fa66e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d19fa66e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d19fa66e Branch: refs/heads/devel-3 Commit: d19fa66edd31e8b8cb481415fcda86bb2c32f6fc Parents: 76faf86 Author: thomas <[email protected]> Authored: Thu Aug 20 18:59:01 2015 -0700 Committer: thomas <[email protected]> Committed: Thu Aug 20 18:59:01 2015 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 1 + .../java/com/datatorrent/stram/StramClient.java | 1 - .../stram/StreamingContainerManagerTest.java | 2 ++ .../datatorrent/stram/engine/AtMostOnceTest.java | 2 +- .../stram/engine/ProcessingModeTests.java | 16 +++++++--------- .../stram/engine/RecoverableInputOperator.java | 10 +++++----- 6 files changed, 16 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index d5de61c..2ab6771 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -36,6 +36,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent private boolean syncCheckpoint = false; + @SuppressWarnings("unused") private AsyncFSStorageAgent() { super(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 8a8baf3..db36ef6 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -54,7 +54,6 @@ import org.apache.log4j.DTLoggerFactory; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BasicContainerOptConfigurator; -import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper; import com.datatorrent.stram.engine.StreamingContainer; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 89f2878..bd9699c 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -727,6 +727,8 @@ public class StreamingContainerManagerTest // deploy all containers for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) { ce.getValue().deploy(); + } + for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) { // skip buffer server purge in monitorHeartbeat ce.getKey().bufferServerAddress = null; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java index 41e0bd9..1205f30 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AtMostOnceTest.java @@ -51,7 +51,7 @@ public class AtMostOnceTest extends ProcessingModeTests Assert.assertTrue("No Duplicates", CollectorOperator.duplicates.isEmpty()); } - //@Test + @Test @Override public void testLinearOperatorRecovery() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index 0393394..92c057d 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -18,16 +18,17 @@ package com.datatorrent.stram.engine; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean; + import static java.lang.Thread.sleep; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,11 +36,11 @@ import com.datatorrent.api.*; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.Operator.ProcessingMode; - import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.support.StramTestSupport.TestMeta; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -48,6 +49,7 @@ import com.datatorrent.stram.tuple.Tuple; */ public class ProcessingModeTests { + @Rule public TestMeta testMeta = new TestMeta(); ProcessingMode processingMode; int maxTuples = 30; @@ -78,8 +80,7 @@ public class ProcessingModeTests dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); - String workingDir = new File("target/testLinearInputOperatorRecovery").getAbsolutePath(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class); rip.setMaximumTuples(maxTuples); rip.setSimulateFailure(true); @@ -102,8 +103,7 @@ public class ProcessingModeTests CollectorOperator.duplicates.clear(); LogicalPlan dag = new LogicalPlan(); - String workingDir = new File("target/testLinearOperatorRecovery").getAbsolutePath(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -128,8 +128,7 @@ public class ProcessingModeTests CollectorOperator.duplicates.clear(); LogicalPlan dag = new LogicalPlan(); - String workingDir = new File("target/testLinearInlineOperatorsRecovery").getAbsolutePath(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -203,7 +202,6 @@ public class ProcessingModeTests } } - private static final long serialVersionUID = 201404161447L; } public static class MultiInputOperator implements Operator http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d19fa66e/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java index 510fbd5..4cf8274 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/RecoverableInputOperator.java @@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Operator; import com.datatorrent.bufferserver.util.Codec; @@ -37,7 +36,7 @@ import com.datatorrent.bufferserver.util.Codec; public class RecoverableInputOperator implements InputOperator, com.datatorrent.api.Operator.CheckpointListener { public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<Long>(); - long checkpointedWindowId; + private long checkpointedWindowId; boolean firstRun = true; transient boolean first; transient long windowId; @@ -92,7 +91,8 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. @Override public void setup(OperatorContext context) { - firstRun &= checkpointedWindowId == 0; + firstRun = (checkpointedWindowId == 0); + logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId)); } @Override @@ -105,6 +105,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. { if (checkpointedWindowId == 0) { checkpointedWindowId = windowId; + logger.debug("firstRun={} checkpointedWindowId={}", firstRun, Codec.getStringWindowId(checkpointedWindowId)); } logger.debug("{} checkpointed at {}", this, Codec.getStringWindowId(windowId)); @@ -113,8 +114,7 @@ public class RecoverableInputOperator implements InputOperator, com.datatorrent. @Override public void committed(long windowId) { - logger.debug("{} committed at {}", this, Codec.getStringWindowId(windowId)); - + logger.debug("{} committed at {} firstRun {}, checkpointedWindowId {}", this, Codec.getStringWindowId(windowId), firstRun, Codec.getStringWindowId(checkpointedWindowId)); if (simulateFailure && firstRun && checkpointedWindowId > 0 && windowId > checkpointedWindowId) { throw new RuntimeException("Failure Simulation from " + this); }
