Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 876737792 -> 265e9088d
APEX-162 #resolve Enhance StramTestSupport.TestMeta API. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3c35cccb Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3c35cccb Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3c35cccb Branch: refs/heads/devel-3 Commit: 3c35cccbd574d2a28e543b644deeb9f7c8a886e5 Parents: 809e6f6 Author: Vlad Rozov <[email protected]> Authored: Mon Oct 12 16:39:32 2015 -0700 Committer: Vlad Rozov <[email protected]> Committed: Mon Oct 12 16:39:32 2015 -0700 ---------------------------------------------------------------------- engine/pom.xml | 2 +- .../com/datatorrent/stram/CheckpointTest.java | 55 ++++-------- .../stram/LogicalPlanModificationTest.java | 34 ++++---- .../datatorrent/stram/OutputUnifiedTest.java | 29 ++++--- .../stram/StramLocalClusterTest.java | 17 ++-- .../datatorrent/stram/StramMiniClusterTest.java | 6 +- .../datatorrent/stram/StramRecoveryTest.java | 48 +++++------ .../com/datatorrent/stram/StreamCodecTest.java | 51 ++--------- .../stram/StreamingContainerManagerTest.java | 90 +++++++------------- .../stram/engine/AutoMetricTest.java | 22 ++--- .../stram/engine/ProcessingModeTests.java | 17 ++-- .../stram/plan/StreamPersistanceTests.java | 27 ++---- .../stram/support/StramTestSupport.java | 64 ++++++++++++-- 13 files changed, 212 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/pom.xml ---------------------------------------------------------------------- diff --git a/engine/pom.xml b/engine/pom.xml index 7974313..2165500 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -145,7 +145,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>2322</maxAllowedViolations> + <maxAllowedViolations>2320</maxAllowedViolations> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index ae28ebd..5d11b86 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -19,8 +19,7 @@ package com.datatorrent.stram; import com.datatorrent.common.util.BaseOperator; -import java.io.File; -import java.io.IOException; + import java.util.*; import com.google.common.collect.Maps; @@ -30,8 +29,6 @@ import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -51,6 +48,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.physical.PTContainer; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.plan.physical.PhysicalPlan; +import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent; import com.datatorrent.stram.support.StramTestSupport.TestMeta; @@ -61,30 +59,9 @@ public class CheckpointTest { @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(CheckpointTest.class); - @Rule public TestMeta testMeta = new TestMeta(); - - /** - * - * @throws IOException - */ - @Before - public void setupEachTest() throws IOException - { - try { - FileContext.getLocalFSFileContext().delete( - new Path(new File(testMeta.dir).getAbsolutePath()), true); - } - catch (Exception e) { - throw new RuntimeException("could not cleanup test dir", e); - } - //StramChild.eventloop.start(); - } - @After - public void teardown() - { - //StramChild.eventloop.stop(); - } + @Rule + public TestMeta testMeta = new TestMeta(); private static class MockInputOperator extends BaseOperator implements InputOperator { @@ -106,6 +83,14 @@ public class CheckpointTest } } + private LogicalPlan dag; + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + } + /** * Test saving of operator state at window boundary. * @throws Exception @@ -113,9 +98,7 @@ public class CheckpointTest @Test public void testBackup() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir, null); + AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.getPath(), null); storageAgent.setSyncCheckpoint(true); dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent); dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1); @@ -172,8 +155,7 @@ public class CheckpointTest public void testUpdateRecoveryCheckpoint() throws Exception { Clock clock = new SystemClock(); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); @@ -273,8 +255,7 @@ public class CheckpointTest public void testUpdateCheckpointsRecovery() { MockClock clock = new MockClock(); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 1); @@ -338,8 +319,7 @@ public class CheckpointTest public void testUpdateCheckpointsProcessingTimeout() { MockClock clock = new MockClock(); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); @@ -419,8 +399,7 @@ public class CheckpointTest public void testBlockedOperatorContainerRestart() { MockClock clock = new MockClock(); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java index 847f3fd..8a50124 100644 --- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask; import javax.validation.ValidationException; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -46,18 +47,26 @@ import com.datatorrent.stram.plan.physical.PTContainer; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.plan.physical.PhysicalPlan; import com.datatorrent.stram.plan.physical.PlanModifier; +import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.support.StramTestSupport.TestMeta; import com.google.common.collect.Sets; public class LogicalPlanModificationTest { - @Rule public TestMeta testMeta = new TestMeta(); + private LogicalPlan dag; + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + } @Test public void testAddOperator() { - LogicalPlan dag = new LogicalPlan(); - GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); @@ -94,7 +103,6 @@ public class LogicalPlanModificationTest @Test public void testSetOperatorProperty() { - LogicalPlan dag = new LogicalPlan(); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); OperatorMeta o1Meta = dag.getMeta(o1); @@ -121,8 +129,6 @@ public class LogicalPlanModificationTest @Test public void testRemoveOperator() { - LogicalPlan dag = new LogicalPlan(); - GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o12 = dag.addOperator("o12", GenericTestOperator.class); @@ -191,8 +197,6 @@ public class LogicalPlanModificationTest @Test public void testRemoveOperator2() { - LogicalPlan dag = new LogicalPlan(); - GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); OperatorMeta o1Meta = dag.getMeta(o1); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); @@ -233,8 +237,6 @@ public class LogicalPlanModificationTest @Test public void testRemoveStream() { - LogicalPlan dag = new LogicalPlan(); - GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); @@ -255,8 +257,6 @@ public class LogicalPlanModificationTest @Test public void testAddStream() { - LogicalPlan dag = new LogicalPlan(); - GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); @@ -295,10 +295,8 @@ public class LogicalPlanModificationTest } - private void testExecutionManager(StorageAgent agent) throws Exception { - - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); + private void testExecutionManager(StorageAgent agent) throws Exception + { dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); StreamingContainerManager dnm = new StreamingContainerManager(dag); @@ -337,13 +335,13 @@ public class LogicalPlanModificationTest @Test public void testExecutionManagerWithSyncStorageAgent() throws Exception { - testExecutionManager(new FSStorageAgent(testMeta.dir, null)); + testExecutionManager(new FSStorageAgent(testMeta.getPath(), null)); } @Test public void testExecutionManagerWithAsyncStorageAgent() throws Exception { - testExecutionManager(new AsyncFSStorageAgent(testMeta.dir, null)); + testExecutionManager(new AsyncFSStorageAgent(testMeta.getPath(), null)); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java index f0461ba..7581cc3 100644 --- a/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java +++ b/engine/src/test/java/com/datatorrent/stram/OutputUnifiedTest.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.util.List; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -50,12 +51,18 @@ public class OutputUnifiedTest @Rule public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta(); - @Test - public void testManyToOnePartition() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); + private LogicalPlan dag; + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); + } + @Test + public void testManyToOnePartition() throws Exception + { TestInputOperator i1 = new TestInputOperator(); dag.addOperator("i1", i1); @@ -83,11 +90,8 @@ public class OutputUnifiedTest } @Test - public void testMxNPartition() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); - + public void testMxNPartition() throws Exception + { TestInputOperator i1 = new TestInputOperator(); dag.addOperator("i1", i1); @@ -117,11 +121,8 @@ public class OutputUnifiedTest } @Test - public void testParallelPartition() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); - + public void testParallelPartition() throws Exception + { TestInputOperator i1 = new TestInputOperator(); dag.addOperator("i1", i1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java index c784fd1..aaf92b8 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java @@ -29,8 +29,6 @@ import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.Context; - import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer; import com.datatorrent.stram.StramLocalCluster.MockComponentFactory; @@ -48,11 +46,12 @@ public class StramLocalClusterTest @Rule public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta(); + private LogicalPlan dag; + @Before public void setup() throws IOException { -// StramChild.eventloop = new DefaultEventLoop("StramLocalClusterTestEventLoop"); -// StramChild.eventloop.start(); + dag = StramTestSupport.createDAG(testMeta); } @After @@ -70,9 +69,7 @@ public class StramLocalClusterTest @Test public void testLocalClusterInitShutdown() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class); genNode.setMaxTuples(2); @@ -110,11 +107,9 @@ public class StramLocalClusterTest @SuppressWarnings("SleepWhileInLoop") public void testRecovery() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.getPath(), null); agent.setSyncCheckpoint(true); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent); + dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); TestGeneratorInputOperator node1 = dag.addOperator("o1", TestGeneratorInputOperator.class); // data will be added externally from test http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java index f0fd325..d5cb14f 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java @@ -227,8 +227,8 @@ public class StramMiniClusterTest private LogicalPlan createDAG(LogicalPlanConfiguration lpc) throws Exception { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString()); - lpc.prepareDAG(dag,null,"testApp"); + dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString()); + lpc.prepareDAG(dag, null, "testApp"); dag.validate(); Assert.assertEquals("", Integer.valueOf(128), dag.getValue(DAG.MASTER_MEMORY_MB)); Assert.assertEquals("", "-Dlog4j.properties=custom_log4j.properties", dag.getValue(DAG.CONTAINER_JVM_OPTIONS)); @@ -360,7 +360,7 @@ public class StramMiniClusterTest { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File(testMeta.dir).toURI().toString()); + dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.toURI().toString()); FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class); dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index 89ae3e7..6dbdcf0 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.test.MockitoUtil; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.mockito.Mockito; @@ -65,6 +66,7 @@ import com.datatorrent.stram.plan.physical.PTContainer; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.plan.physical.PhysicalPlan; import com.datatorrent.stram.plan.physical.PhysicalPlanTest.PartitioningTestOperator; +import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.support.StramTestSupport.TestMeta; import static org.junit.Assert.assertEquals; @@ -72,12 +74,20 @@ import static org.junit.Assert.assertEquals; public class StramRecoveryTest { private static final Logger LOG = LoggerFactory.getLogger(StramRecoveryTest.class); - @Rule public final TestMeta testMeta = new TestMeta(); - private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception + @Rule + public final TestMeta testMeta = new TestMeta(); + + private LogicalPlan dag; + + @Before + public void setup() { - LogicalPlan dag = new LogicalPlan(); + dag = StramTestSupport.createDAG(testMeta); + } + private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception + { GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); PartitioningTestOperator o2 = dag.addOperator("o2", PartitioningTestOperator.class); o2.setPartitionCount(3); @@ -127,13 +137,13 @@ public class StramRecoveryTest @Test public void testPhysicalPlanSerializationWithSyncAgent() throws Exception { - testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null)); + testPhysicalPlanSerialization(new FSStorageAgent(testMeta.getPath(), null)); } @Test public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception { - testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null)); + testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.getPath(), null)); } public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener @@ -161,10 +171,6 @@ public class StramRecoveryTest */ private void testContainerManager(StorageAgent agent) throws Exception { - FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run - - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); StatsListeningOperator o1 = dag.addOperator("o1", StatsListeningOperator.class); @@ -188,8 +194,7 @@ public class StramRecoveryTest assertEquals("state " + o1p1, PTOperator.State.PENDING_DEPLOY, o1p1.getState()); // test restore initial snapshot + log - dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + dag = StramTestSupport.createDAG(testMeta); scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false); dag = scm.getLogicalPlan(); plan = scm.getPhysicalPlan(); @@ -245,8 +250,7 @@ public class StramRecoveryTest checkpoint(scm, o1p1, offlineCheckpoint); // test restore - dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + dag = StramTestSupport.createDAG(testMeta); scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false); Assert.assertNotSame("dag references", dag, scm.getLogicalPlan()); @@ -270,13 +274,13 @@ public class StramRecoveryTest @Test public void testContainerManagerWithSyncAgent() throws Exception { - testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null)); + testPhysicalPlanSerialization(new FSStorageAgent(testMeta.getPath(), null)); } @Test public void testContainerManagerWithAsyncAgent() throws Exception { - testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir, null)); + testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.getPath(), null)); } @Test @@ -284,9 +288,7 @@ public class StramRecoveryTest { final MutableInt flushCount = new MutableInt(); final MutableBoolean isClosed = new MutableBoolean(false); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.getPath(), null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); StreamingContainerManager scm = new StreamingContainerManager(dag); @@ -386,12 +388,10 @@ public class StramRecoveryTest private void testRestartApp(StorageAgent agent, String appPath1) throws Exception { - FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run String appId1 = "app1"; String appId2 = "app2"; - String appPath2 = testMeta.dir + "/" + appId2; + String appPath2 = testMeta.getPath() + "/" + appId2; - LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1); dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1); dag.setAttribute(LogicalPlan.APPLICATION_ATTEMPT_ID, 1); @@ -445,21 +445,21 @@ public class StramRecoveryTest @Test public void testRestartAppWithSyncAgent() throws Exception { - String appPath1 = testMeta.dir + "/app1"; + final String appPath1 = testMeta.getPath() + "/app1"; testRestartApp(new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1); } @Test public void testRestartAppWithAsyncAgent() throws Exception { - String appPath1 = testMeta.dir + "/app1"; + final String appPath1 = testMeta.getPath() + "/app1"; testRestartApp(new AsyncFSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1); } @Test public void testRpcFailover() throws Exception { - String appPath = testMeta.dir; + String appPath = testMeta.getPath(); Configuration conf = new Configuration(false); final AtomicBoolean timedout = new AtomicBoolean(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java index 6bfa591..ddf3448 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java @@ -34,6 +34,7 @@ import com.google.common.collect.Lists; import java.io.Serializable; import java.util.*; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -41,15 +42,20 @@ import org.junit.Test; */ public class StreamCodecTest { + private LogicalPlan dag; + @Rule public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta(); + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + } + @Test public void testStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -107,9 +113,6 @@ public class StreamCodecTest @Test public void testStreamCodecReuse() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -154,9 +157,6 @@ public class StreamCodecTest @Test public void testDefaultStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); DefaultCodecOperator node2 = dag.addOperator("node2", DefaultCodecOperator.class); DefaultCodecOperator node3 = dag.addOperator("node3", DefaultCodecOperator.class); @@ -213,9 +213,6 @@ public class StreamCodecTest @Test public void testPartitioningStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); dag.setAttribute(node2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(3)); @@ -264,9 +261,6 @@ public class StreamCodecTest @Test public void testMxNPartitioningStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); @@ -331,9 +325,6 @@ public class StreamCodecTest @Test public void testParallelPartitioningStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); @@ -422,9 +413,6 @@ public class StreamCodecTest @Test public void testMultipleInputStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); TestStreamCodec serDe = new TestStreamCodec(); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); @@ -477,9 +465,6 @@ public class StreamCodecTest @Test public void testPartitioningMultipleInputStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); @@ -556,9 +541,6 @@ public class StreamCodecTest @Test public void testMultipleStreamCodecs() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -613,9 +595,6 @@ public class StreamCodecTest @Test public void testPartitioningMultipleStreamCodecs() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -699,9 +678,6 @@ public class StreamCodecTest @Test public void testMxNMultipleStreamCodecs() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); @@ -803,9 +779,6 @@ public class StreamCodecTest @Test public void testInlineStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -887,9 +860,6 @@ public class StreamCodecTest @Test public void testCascadingStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -974,9 +944,6 @@ public class StreamCodecTest @Test public void testDynamicPartitioningStreamCodec() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); dag.setAttribute(node1, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); dag.setAttribute(node1, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener) new PartitioningTest.PartitionLoadWatch())); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 710440d..b257632 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputByteBuffer; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -83,10 +84,20 @@ import org.eclipse.jetty.websocket.WebSocket; public class StreamingContainerManagerTest { - @Rule public TestMeta testMeta = new TestMeta(); + @Rule + public TestMeta testMeta = new TestMeta(); + + private LogicalPlan dag; + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + } @Test - public void testDeployInfoSerialization() throws Exception { + public void testDeployInfoSerialization() throws Exception + { OperatorDeployInfo ndi = new OperatorDeployInfo(); ndi.name = "node1"; ndi.type = OperatorDeployInfo.OperatorType.GENERIC; @@ -136,11 +147,8 @@ public class StreamingContainerManagerTest } @Test - public void testGenerateDeployInfo() { - - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - + public void testGenerateDeployInfo() + { TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); @@ -238,9 +246,8 @@ public class StreamingContainerManagerTest } @Test - public void testStaticPartitioning() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); + public void testStaticPartitioning() + { // // ,---> node2----, // | | @@ -355,9 +362,6 @@ public class StreamingContainerManagerTest @Test public void testRecoveryOrder() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - GenericTestOperator node1 = dag.addOperator("node1", GenericTestOperator.class); GenericTestOperator node2 = dag.addOperator("node2", GenericTestOperator.class); GenericTestOperator node3 = dag.addOperator("node3", GenericTestOperator.class); @@ -406,9 +410,6 @@ public class StreamingContainerManagerTest @Test public void testRecoveryUpstreamInline() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); @@ -444,11 +445,9 @@ public class StreamingContainerManagerTest } @Test - public void testCheckpointWindowIds() throws Exception { - File path = new File(testMeta.dir); - FileUtils.deleteDirectory(path.getAbsoluteFile()); - - FSStorageAgent sa = new FSStorageAgent(path.getPath(), null); + public void testCheckpointWindowIds() throws Exception + { + FSStorageAgent sa = new FSStorageAgent(testMeta.getPath(), null); long[] windowIds = new long[]{123L, 345L, 234L}; for (long windowId : windowIds) { @@ -475,11 +474,7 @@ public class StreamingContainerManagerTest @Test public void testAsyncCheckpointWindowIds() throws Exception { - File path = new File(testMeta.dir); - FileUtils.deleteDirectory(path.getAbsoluteFile()); - FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath")); - - AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath(), null); + AsyncFSStorageAgent sa = new AsyncFSStorageAgent(testMeta.getPath(), null); long[] windowIds = new long[]{123L, 345L, 234L}; for (long windowId : windowIds) { @@ -506,13 +501,8 @@ public class StreamingContainerManagerTest @Test public void testProcessHeartbeat() throws Exception { - FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run - - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); - dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); + dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()})); dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); StreamingContainerManager scm = new StreamingContainerManager(dag); @@ -655,9 +645,6 @@ public class StreamingContainerManagerTest @Test public void testValidGenericOperatorDeployInfoType() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); TestGeneratorInputOperator.ValidGenericOperator o2 = dag.addOperator("o2", TestGeneratorInputOperator.ValidGenericOperator.class); @@ -683,9 +670,6 @@ public class StreamingContainerManagerTest @Test public void testValidInputOperatorDeployInfoType() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - TestGeneratorInputOperator.ValidInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.ValidInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); @@ -711,8 +695,6 @@ public class StreamingContainerManagerTest @Test public void testOperatorShutdown() { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent()); GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class); @@ -781,7 +763,6 @@ public class StreamingContainerManagerTest private void testDownStreamPartition(Locality locality) throws Exception { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.setAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2)); @@ -815,8 +796,7 @@ public class StreamingContainerManagerTest @Test public void testPhysicalPropertyUpdate() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); @@ -837,10 +817,9 @@ public class StreamingContainerManagerTest lc.shutdown(); } - private LogicalPlan getTestAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> qClass, + private void setupAppDataSourceLogicalPlan(Class<? extends TestAppDataQueryOperator> qClass, Class<? extends TestAppDataSourceOperator> dsClass, Class<? extends TestAppDataResultOperator> rClass) { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); TestAppDataQueryOperator q = dag.addOperator("q", qClass); TestAppDataResultOperator r = dag.addOperator("r", rClass); @@ -854,13 +833,11 @@ public class StreamingContainerManagerTest dag.addStream("o1-to-ds", o1.outport, ds.inport1); dag.addStream("q-to-ds", q.outport, ds.query); dag.addStream("ds-to-r", ds.result, r.inport); - - return dag; } - private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception + private void testAppDataSources(boolean appendQIDToTopic) throws Exception { - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); StramLocalCluster lc = new StramLocalCluster(dag); lc.runAsync(); StreamingContainerManager dnmgr = lc.dnmgr; @@ -883,22 +860,22 @@ public class StreamingContainerManagerTest @Test public void testGetAppDataSources1() throws Exception { - LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class); - testAppDataSources(dag, true); + setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator1.class); + testAppDataSources(true); } @Test public void testGetAppDataSources2() throws Exception { - LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class); - testAppDataSources(dag, false); + setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator2.class); + testAppDataSources(false); } @Test public void testGetAppDataSources3() throws Exception { - LogicalPlan dag = getTestAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class); - testAppDataSources(dag, false); + setupAppDataSourceLogicalPlan(TestAppDataQueryOperator.class, TestAppDataSourceOperator.class, TestAppDataResultOperator.ResultOperator3.class); + testAppDataSources(false); } @Test @@ -933,8 +910,7 @@ public class StreamingContainerManagerTest try { server.start(); int port = server.getPort(); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java index a1312a5..f6451c9 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; +import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -176,6 +177,14 @@ public class AutoMetricTest } + private LogicalPlan dag; + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + } + /** * Verify custom stats generated by operator are propagated and trigger repartition. * @@ -185,8 +194,7 @@ public class AutoMetricTest @SuppressWarnings("SleepWhileInLoop") public void testMetricPropagation() throws Exception { - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -234,9 +242,7 @@ public class AutoMetricTest CountDownLatch latch = new CountDownLatch(1); LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration()); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class); OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class); @@ -264,8 +270,7 @@ public class AutoMetricTest CountDownLatch latch = new CountDownLatch(2); LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration()); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class); OperatorWithMetrics o1 = dag.addOperator("o1", OperatorWithMetrics.class); @@ -289,8 +294,7 @@ public class AutoMetricTest public void testInjectionOfDefaultMetricsAggregator() throws Exception { LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration()); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class); OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class); @@ -365,9 +369,7 @@ public class AutoMetricTest CountDownLatch latch = new CountDownLatch(1); LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(new Configuration()); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); TestGeneratorInputOperator inputOperator = dag.addOperator("input", TestGeneratorInputOperator.class); OperatorWithMetricMethod o1 = dag.addOperator("o1", OperatorWithMetricMethod.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index cef671e..6df4e94 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -43,6 +43,7 @@ import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.support.StramTestSupport.TestMeta; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -52,7 +53,11 @@ import com.datatorrent.stram.tuple.Tuple; */ public class ProcessingModeTests { - @Rule public TestMeta testMeta = new TestMeta(); + @Rule + public TestMeta testMeta = new TestMeta(); + + private LogicalPlan dag; + ProcessingMode processingMode; int maxTuples = 30; @@ -64,6 +69,7 @@ public class ProcessingModeTests @Before public void setup() throws IOException { + dag = StramTestSupport.createDAG(testMeta); StreamingContainer.eventloop.start(); } @@ -79,11 +85,10 @@ public class ProcessingModeTests CollectorOperator.collection.clear(); CollectorOperator.duplicates.clear(); - LogicalPlan dag = new LogicalPlan(); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class); rip.setMaximumTuples(maxTuples); rip.setSimulateFailure(true); @@ -105,8 +110,7 @@ public class ProcessingModeTests CollectorOperator.collection.clear(); CollectorOperator.duplicates.clear(); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -130,8 +134,7 @@ public class ProcessingModeTests CollectorOperator.collection.clear(); CollectorOperator.duplicates.clear(); - LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.getPath(), null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java index 1839c91..5dca8a4 100644 --- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java +++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java @@ -31,6 +31,7 @@ import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; @@ -203,10 +204,17 @@ public class StreamPersistanceTests { } + private LogicalPlan dag; + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + } + @Test public void testPersistStreamOperatorIsAdded() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x = dag.addOperator("x", new GenericTestOperator()); TestRecieverOperator persister = new TestRecieverOperator(); @@ -222,7 +230,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamOperatorIsAddedPerSink() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator()); GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator()); @@ -255,7 +262,6 @@ public class StreamPersistanceTests public void testaddStreamThrowsExceptionOnInvalidLoggerType() { // Test Logger with non-optional output ports - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x = dag.addOperator("x", new GenericTestOperator()); StreamMeta stream = dag.addStream("Stream1", input1.outport, x.inport1); @@ -302,7 +308,6 @@ public class StreamPersistanceTests public void testaddStreamThrowsExceptionOnInvalidInputPortForLoggerType() { // Test for input port belonging to different object - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x = dag.addOperator("x", new GenericTestOperator()); TestRecieverOperator persister = new TestRecieverOperator(); @@ -322,7 +327,6 @@ public class StreamPersistanceTests public void testPersistStreamOperatorIsRemovedWhenStreamIsRemoved() { // Remove Stream and check if persist operator is removed - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x = dag.addOperator("x", new GenericTestOperator()); TestRecieverOperator persister = new TestRecieverOperator(); @@ -340,7 +344,6 @@ public class StreamPersistanceTests public void testPersistStreamOperatorIsRemovedWhenSinkIsRemoved() { // Remove sink and check if corresponding persist operator is removed - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator()); GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator()); @@ -383,7 +386,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamOperatorIsRemovedWhenAllSinksAreRemoved() { - LogicalPlan dag = new LogicalPlan(); TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class); GenericTestOperator x1 = dag.addOperator("x1", new GenericTestOperator()); GenericTestOperator x2 = dag.addOperator("x2", new GenericTestOperator()); @@ -409,7 +411,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamOperatorGeneratesIdenticalOutputAsSink() throws ClassNotFoundException, IOException, InterruptedException { - LogicalPlan dag = new LogicalPlan(); AscendingNumbersOperator input1 = dag.addOperator("input1", AscendingNumbersOperator.class); // Add PersistOperator directly to dag final TestRecieverOperator x = dag.addOperator("x", new TestRecieverOperator()); @@ -603,7 +604,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamWithFiltering() throws ClassNotFoundException, IOException, InterruptedException { - LogicalPlan dag = new LogicalPlan(); AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2)); TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator()); @@ -617,7 +617,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamOnSingleSinkWithFiltering() throws ClassNotFoundException, IOException, InterruptedException { - LogicalPlan dag = new LogicalPlan(); AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2)); final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator()); @@ -632,7 +631,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamOnSingleSinkWithFilteringContainerLocal() throws ClassNotFoundException, IOException, InterruptedException { - LogicalPlan dag = new LogicalPlan(); AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); PassThruOperatorWithCodec passThru = dag.addOperator("PassThrough", new PassThruOperatorWithCodec(2)); PassThruOperatorWithCodec passThru2 = dag.addOperator("Multiples_of_3", new PassThruOperatorWithCodec(3)); @@ -696,7 +694,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamOperatorGeneratesUnionOfAllSinksOutput() throws ClassNotFoundException, IOException { - LogicalPlan dag = new LogicalPlan(); AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); PassThruOperatorWithCodec passThru1 = dag.addOperator("PassThrough1", new PassThruOperatorWithCodec(2)); PassThruOperatorWithCodec passThru2 = dag.addOperator("PassThrough2", new PassThruOperatorWithCodec(3)); @@ -829,7 +826,6 @@ public class StreamPersistanceTests @Test public void testPersistStreamOperatorMultiplePhysicalOperatorsForSink() throws ClassNotFoundException, IOException { - LogicalPlan dag = new LogicalPlan(); AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering()); final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator()); @@ -883,7 +879,6 @@ public class StreamPersistanceTests @Test public void testPartitionedPersistOperator() throws ClassNotFoundException, IOException { - LogicalPlan dag = new LogicalPlan(); AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); PartitionedTestOperatorWithFiltering passThru = dag.addOperator("partition", new PartitionedTestOperatorWithFiltering()); final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator()); @@ -942,10 +937,6 @@ public class StreamPersistanceTests @Test public void testDynamicPartitioning() throws ClassNotFoundException, IOException { - LogicalPlan dag = new LogicalPlan(); - - dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - AscendingNumbersOperator ascend = dag.addOperator("ascend", new AscendingNumbersOperator()); final TestRecieverOperator console = dag.addOperator("console", new TestRecieverOperator()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3c35cccb/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java index 314fdfc..cf2a887 100644 --- a/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java +++ b/engine/src/test/java/com/datatorrent/stram/support/StramTestSupport.java @@ -22,6 +22,10 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; import java.lang.reflect.Field; +import java.net.URI; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -50,6 +54,7 @@ import com.datatorrent.stram.api.AppDataSource; import com.datatorrent.stram.api.BaseContext; import com.datatorrent.stram.engine.OperatorContext; import com.datatorrent.stram.engine.WindowGenerator; +import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -253,23 +258,54 @@ abstract public class StramTestSupport public static class TestMeta extends TestWatcher { - public String dir = null; + private File dir; @Override protected void starting(org.junit.runner.Description description) { - String methodName = description.getMethodName(); - String className = description.getClassName(); - //className = className.substring(className.lastIndexOf('.') + 1); - this.dir = "target/" + className + "/" + methodName; - new File(this.dir).mkdirs(); + final String methodName = description.getMethodName(); + final String className = description.getClassName(); + dir = new File("target/" + className + "/" + methodName); + try { + Files.createDirectories(dir.toPath()); + } catch (FileAlreadyExistsException e) { + try { + Files.delete(dir.toPath()); + Files.createDirectories(dir.toPath()); + } catch (IOException ioe) { + throw new RuntimeException("Fail to create test working directory " + dir.getAbsolutePath(), e); + } + } catch (IOException e) { + throw new RuntimeException("Fail to create test working directory " + dir.getAbsolutePath(), e); + } } @Override protected void finished(org.junit.runner.Description description) { - FileUtils.deleteQuietly(new File(this.dir)); + FileUtils.deleteQuietly(dir); + } + + public String getPath() + { + return dir.getPath(); + } + + public String getAbsolutePath() + { + return dir.getAbsolutePath(); + } + + public Path toPath() + { + return dir.toPath(); + } + + public URI toURI() + { + return dir.toURI(); } + } public static class TestHomeDirectory extends TestWatcher @@ -333,6 +369,20 @@ abstract public class StramTestSupport } } + public static LogicalPlan createDAG(final TestMeta testMeta, final String suffix) + { + if (suffix == null) { + throw new NullPointerException(); + } + LogicalPlan dag = new LogicalPlan(); + dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.getPath() + suffix); + return dag; + } + + public static LogicalPlan createDAG(final TestMeta testMeta) + { + return createDAG(testMeta, ""); + } public static class MemoryStorageAgent implements StorageAgent, Serializable {
