fixed tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1617ca39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1617ca39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1617ca39 Branch: refs/heads/master Commit: 1617ca393c1e066349282dfb6d9778aea8c67177 Parents: c5d819b Author: Gaurav <[email protected]> Authored: Thu Aug 6 17:31:21 2015 -0700 Committer: Gaurav <[email protected]> Committed: Thu Aug 6 21:12:51 2015 -0700 ---------------------------------------------------------------------- .../stram/StreamingContainerManagerTest.java | 7 +++---- .../datatorrent/stram/engine/AtLeastOnceTest.java | 16 ++++++++++++++++ .../stram/engine/StreamingContainerTest.java | 6 ++++++ .../datatorrent/stram/stream/OiOEndWindowTest.java | 5 +++++ 4 files changed, 30 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 38a54f0..a238e3e 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -740,8 +740,7 @@ public class StreamingContainerManagerTest { @Test public void testPhysicalPropertyUpdate() throws Exception{ LogicalPlan dag = new LogicalPlan(); - String workingDir = new File("target/testPhysicalPropertyUpdate").getAbsolutePath(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); @@ -784,8 +783,7 @@ public class StreamingContainerManagerTest { private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception { - String workingDir = new File("target/testAppDataSources").getAbsolutePath(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); StramLocalCluster lc = new StramLocalCluster(dag); lc.runAsync(); StreamingContainerManager dnmgr = lc.dnmgr; @@ -859,6 +857,7 @@ public class StreamingContainerManagerTest { try { server.start(); LogicalPlan dag = new LogicalPlan(); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java index 01cc675..f32be13 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java @@ -15,6 +15,7 @@ */ package com.datatorrent.stram.engine; +import java.io.File; import java.io.IOException; import org.junit.After; @@ -24,7 +25,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG.Locality; + +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.engine.ProcessingModeTests.CollectorOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; @@ -56,6 +60,10 @@ public class AtLeastOnceTest CollectorOperator.collection.clear(); int maxTuples = 30; LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/testInputOperatorRecovery").getAbsolutePath(); + AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null); + asyncFSStorageAgent.setSyncCheckpoint(true); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -79,6 +87,10 @@ public class AtLeastOnceTest CollectorOperator.collection.clear(); int maxTuples = 30; LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/testOperatorRecovery").getAbsolutePath(); + AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null); + asyncFSStorageAgent.setSyncCheckpoint(true); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -103,6 +115,10 @@ public class AtLeastOnceTest CollectorOperator.collection.clear(); int maxTuples = 30; LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/testOperatorRecovery").getAbsolutePath(); + AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null); + asyncFSStorageAgent.setSyncCheckpoint(true); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent); //dag.getAttributes().get(DAG.HEARTBEAT_INTERVAL_MILLIS, 400); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java index 911f69a..7d37429 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java @@ -15,6 +15,7 @@ */ package com.datatorrent.stram.engine; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -24,7 +25,10 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.api.Context; import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator.CheckpointListener; @@ -42,6 +46,8 @@ public class StreamingContainerTest public void testCommitted() throws IOException, ClassNotFoundException { LogicalPlan lp = new LogicalPlan(); + String workingDir = new File("target/testCommitted").getAbsolutePath(); + lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1); CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1617ca39/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java index 38f7a0b..a4e9c43 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java @@ -15,6 +15,9 @@ */ package com.datatorrent.stram.stream; +import java.io.File; + +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.*; import org.junit.Assert; @@ -93,6 +96,8 @@ public class OiOEndWindowTest public void validateOiOImplementation() throws Exception { LogicalPlan lp = new LogicalPlan(); + String workingDir = new File("target/validateOiOImplementation").getAbsolutePath(); + lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); TestInputOperator io = lp.addOperator("Input Operator", new TestInputOperator()); FirstGenericOperator go = lp.addOperator("First Generic Operator", new FirstGenericOperator()); SecondGenericOperator out = lp.addOperator("Second Generic Operator", new SecondGenericOperator());
