Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 bff4c5bad -> 711fd0708
APEX-88 #resolve #comment fixed the async storage agent Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/711fd070 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/711fd070 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/711fd070 Branch: refs/heads/devel-3 Commit: 711fd070876c98da91a87664884938224ac73dad Parents: bff4c5b Author: Gaurav <[email protected]> Authored: Thu Sep 3 19:33:05 2015 -0700 Committer: Gaurav <[email protected]> Committed: Thu Sep 3 19:33:05 2015 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 34 +++++++++----------- .../common/util/AsyncFSStorageAgentTest.java | 4 +-- .../com/datatorrent/stram/CheckpointTest.java | 2 +- .../stram/LogicalPlanModificationTest.java | 2 +- .../com/datatorrent/stram/PartitioningTest.java | 8 ++--- .../stram/StramLocalClusterTest.java | 4 +-- .../datatorrent/stram/StramMiniClusterTest.java | 4 +-- .../datatorrent/stram/StramRecoveryTest.java | 7 ++-- .../stram/StreamingContainerManagerTest.java | 8 ++--- .../stram/debug/TupleRecorderTest.java | 2 +- .../stram/engine/AtLeastOnceTest.java | 6 ++-- .../stram/engine/AutoMetricTest.java | 2 +- .../stram/engine/InputOperatorTest.java | 2 +- .../stram/engine/ProcessingModeTests.java | 6 ++-- .../datatorrent/stram/engine/SliderTest.java | 2 +- .../com/datatorrent/stram/engine/StatsTest.java | 4 +-- .../stram/engine/StreamingContainerTest.java | 2 +- .../stram/engine/WindowGeneratorTest.java | 2 +- .../stram/stream/OiOEndWindowTest.java | 2 +- .../stram/webapp/StramWebServicesTest.java | 2 +- 20 files changed, 51 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index b565447..b89ae59 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -27,9 +27,8 @@ import org.slf4j.LoggerFactory; import com.datatorrent.netlet.util.DTThrowable; public class AsyncFSStorageAgent extends FSStorageAgent { - private final transient FileSystem fs; private final transient Configuration conf; - private final String localBasePath; + private final transient String localBasePath; private boolean syncCheckpoint = false; @@ -37,32 +36,31 @@ public class AsyncFSStorageAgent extends FSStorageAgent private AsyncFSStorageAgent() { super(); - fs = null; conf = null; localBasePath = null; } public AsyncFSStorageAgent(String path, Configuration conf) { - this(".", path, conf); - } - - public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf) - { super(path, conf); - if (localBasePath == null) { - this.localBasePath = "/tmp"; - } - else { - this.localBasePath = localBasePath; - } - logger.debug("Initialize storage agent with {}.", this.localBasePath); - this.conf = conf == null ? new Configuration() : conf; try { - fs = FileSystem.newInstance(this.conf); + File tempFile = File.createTempFile("msp", "msp"); + this.localBasePath = new File(tempFile.getParent(), "localcheckpoint").getAbsolutePath(); + tempFile.delete(); } catch (IOException ex) { throw new RuntimeException(ex); } + logger.info("using {} as the basepath for checkpointing.", this.localBasePath); + this.conf = conf == null ? new Configuration() : conf; + } + + /* + * Storage Agent should internally manage localBasePath. It should not take it from user + */ + @Deprecated + public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf) + { + this(path, conf); } @Override @@ -122,7 +120,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent @Override public Object readResolve() throws ObjectStreamException { - return new AsyncFSStorageAgent(this.localBasePath, this.path, null); + return new AsyncFSStorageAgent(this.path, null); } public boolean isSyncCheckpoint() http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java index e7f9f66..a1504e4 100644 --- a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java +++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java @@ -52,7 +52,7 @@ public class AsyncFSStorageAgentTest } catch (IOException e) { throw new RuntimeException(e); } - storageAgent = new AsyncFSStorageAgent(basePath, applicationPath, null); + storageAgent = new AsyncFSStorageAgent(applicationPath, null); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(DAG.APPLICATION_PATH, applicationPath); @@ -116,7 +116,7 @@ public class AsyncFSStorageAgentTest public void testRecovery() throws IOException { testSave(); - testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.basePath, testMeta.applicationPath, null); + testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.applicationPath, null); testSave(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index 4072894..65929fd 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -112,7 +112,7 @@ public class CheckpointTest { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir + "/locaPath", testMeta.dir, null); + AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir, null); storageAgent.setSyncCheckpoint(true); dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent); dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java index 78a1bd8..efdbd35 100644 --- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java @@ -340,7 +340,7 @@ public class LogicalPlanModificationTest @Test public void testExecutionManagerWithAsyncStorageAgent() throws Exception { - testExecutionManager(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + testExecutionManager(new AsyncFSStorageAgent(testMeta.dir, null)); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java index 15ad76e..0b3692a 100644 --- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java +++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java @@ -151,7 +151,7 @@ public class PartitioningTest { LogicalPlan dag = new LogicalPlan(); File checkpointDir = new File(TEST_OUTPUT_DIR, "testDefaultPartitioning"); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null)); Integer[][] testData = { {4, 5} @@ -252,7 +252,7 @@ public class PartitioningTest LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5); File checkpointDir = new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning"); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null)); CollectorOperator.receivedTuples.clear(); @@ -401,7 +401,7 @@ public class PartitioningTest { File checkpointDir = new File(TEST_OUTPUT_DIR, "testInputOperatorPartitioning"); dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, checkpointDir.getPath()); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath(), null)); PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator()); dag.setAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()})); @@ -423,7 +423,7 @@ public class PartitioningTest Checkpoint checkpoint = new Checkpoint(10L, 0, 0); p.checkpoints.add(checkpoint); p.setRecoveryCheckpoint(checkpoint); - AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath(), null); agent.save(inputDeployed, p.getId(), 10L); agent.copyToHDFS(p.getId(), 10l); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java index 1881566..6e9eb48 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java @@ -69,7 +69,7 @@ public class StramLocalClusterTest { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class); genNode.setMaxTuples(2); @@ -109,7 +109,7 @@ public class StramLocalClusterTest { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null); agent.setSyncCheckpoint(true); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java index 99478f5..a377a72 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java @@ -203,7 +203,7 @@ public class StramMiniClusterTest LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf); tb.addFromProperties(dagProps, null); LogicalPlan dag = createDAG(tb); - AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null); agent.setSyncCheckpoint(true); dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); Configuration yarnConf = new Configuration(yarnCluster.getConfig()); @@ -362,7 +362,7 @@ public class StramMiniClusterTest LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null); agent.setSyncCheckpoint(true); dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index ab2092a..ebce32a 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -130,7 +130,7 @@ public class StramRecoveryTest @Test public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception { - testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null)); } public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener @@ -273,7 +273,7 @@ public class StramRecoveryTest @Test public void testContainerManagerWithAsyncAgent() throws Exception { - testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null)); } @Test @@ -450,8 +450,7 @@ public class StramRecoveryTest public void testRestartAppWithAsyncAgent() throws Exception { String appPath1 = testMeta.dir + "/app1"; - String checkpointPath = testMeta.dir + "/localPath"; - testRestartApp(new AsyncFSStorageAgent(checkpointPath, appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1); + testRestartApp(new AsyncFSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1); } @Test http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index bd9699c..2656e8d 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -476,7 +476,7 @@ public class StreamingContainerManagerTest FileUtils.deleteDirectory(path.getAbsoluteFile()); FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath")); - AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath() + "/localPath", path.getPath(), null); + AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath(), null); long[] windowIds = new long[]{123L, 345L, 234L}; for (long windowId : windowIds) { @@ -813,7 +813,7 @@ public class StreamingContainerManagerTest public void testPhysicalPropertyUpdate() throws Exception { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); @@ -857,7 +857,7 @@ public class StreamingContainerManagerTest private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception { - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); StramLocalCluster lc = new StramLocalCluster(dag); lc.runAsync(); StreamingContainerManager dnmgr = lc.dnmgr; @@ -931,7 +931,7 @@ public class StreamingContainerManagerTest try { server.start(); LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java index 718bf1b..b7647a5 100644 --- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java @@ -212,7 +212,7 @@ public class TupleRecorderTest public void testRecordingFlow() throws Exception { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath() + "/localPath", testWorkDir.getAbsolutePath(), null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath(), null)); dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath()); dag.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024); // 1KB per part http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java index f32be13..5108e03 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AtLeastOnceTest.java @@ -61,7 +61,7 @@ public class AtLeastOnceTest int maxTuples = 30; LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target/testInputOperatorRecovery").getAbsolutePath(); - AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null); + AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null); asyncFSStorageAgent.setSyncCheckpoint(true); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); @@ -88,7 +88,7 @@ public class AtLeastOnceTest int maxTuples = 30; LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target/testOperatorRecovery").getAbsolutePath(); - AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null); + AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null); asyncFSStorageAgent.setSyncCheckpoint(true); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); @@ -116,7 +116,7 @@ public class AtLeastOnceTest int maxTuples = 30; LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target/testOperatorRecovery").getAbsolutePath(); - AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null); + AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(workingDir, null); asyncFSStorageAgent.setSyncCheckpoint(true); dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, asyncFSStorageAgent); //dag.getAttributes().get(DAG.HEARTBEAT_INTERVAL_MILLIS, 400); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java index 3ca5221..e0bfc37 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java @@ -182,7 +182,7 @@ public class AutoMetricTest public void testMetricPropagation() throws Exception { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java index 142f45f..6976dee 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java @@ -126,7 +126,7 @@ public class InputOperatorTest { LogicalPlan dag = new LogicalPlan(); String testWorkDir = new File("target").getAbsolutePath(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir + "/localBasePath", testWorkDir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir, null)); EvenOddIntegerGeneratorInputOperator generator = dag.addOperator("NumberGenerator", EvenOddIntegerGeneratorInputOperator.class); final CollectorModule<Number> collector = dag.addOperator("NumberCollector", new CollectorModule<Number>()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index 92c057d..28e75fa 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -80,7 +80,7 @@ public class ProcessingModeTests dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class); rip.setMaximumTuples(maxTuples); rip.setSimulateFailure(true); @@ -103,7 +103,7 @@ public class ProcessingModeTests CollectorOperator.duplicates.clear(); LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -128,7 +128,7 @@ public class ProcessingModeTests CollectorOperator.duplicates.clear(); LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java index 26515d4..d16cf19 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java @@ -137,7 +137,7 @@ public class SliderTest { LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target/sliderTest").getAbsolutePath(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100); Input input = dag.addOperator("Input", new Input()); Sum sum = dag.addOperator("Sum", new Sum()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java index 0ededd4..aa32bdc 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java @@ -174,7 +174,7 @@ public class StatsTest int tupleCount = 10; LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target").getAbsolutePath(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class); TestInputStatsListener testInputStatsListener = new TestInputStatsListener(); dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener})); @@ -230,7 +230,7 @@ public class StatsTest { LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target/baseTestForQueueSize").getAbsolutePath(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200); TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class); testOper.setMaxTuples(maxTuples); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java index 7d37429..70c896c 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java @@ -47,7 +47,7 @@ public class StreamingContainerTest { LogicalPlan lp = new LogicalPlan(); String workingDir = new File("target/testCommitted").getAbsolutePath(); - lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1); CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java index 4f7b842..4665d79 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java @@ -306,7 +306,7 @@ public class WindowGeneratorTest logger.info("Testing Out of Sequence Error"); LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target/testOutofSequenceError").getAbsolutePath(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); RandomNumberGenerator rng = dag.addOperator("random", new RandomNumberGenerator()); MyLogger ml = dag.addOperator("logger", new MyLogger()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java index a4e9c43..365dd03 100644 --- a/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java +++ b/engine/src/test/java/com/datatorrent/stram/stream/OiOEndWindowTest.java @@ -97,7 +97,7 @@ public class OiOEndWindowTest { LogicalPlan lp = new LogicalPlan(); String workingDir = new File("target/validateOiOImplementation").getAbsolutePath(); - lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); TestInputOperator io = lp.addOperator("Input Operator", new TestInputOperator()); FirstGenericOperator go = lp.addOperator("First Generic Operator", new FirstGenericOperator()); SecondGenericOperator out = lp.addOperator("Second Generic Operator", new SecondGenericOperator()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/711fd070/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java index 9b8f0b2..b0680b8 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java @@ -129,7 +129,7 @@ public class StramWebServicesTest extends JerseyTest LogicalPlan dag = new LogicalPlan(); String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag); appContext = new TestAppContext();
