Repository: apex-core Updated Branches: refs/heads/master 51de67e61 -> d9bc67d5a
APEXCORE-598 Write checkpoints to APPLICATION_PATH in embedded execution mode. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f6f6d5f5 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f6f6d5f5 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f6f6d5f5 Branch: refs/heads/master Commit: f6f6d5f541bb88301a4b401392e848ee0d2bc3c9 Parents: 05c798d Author: Thomas Weise <[email protected]> Authored: Tue Jan 3 09:59:52 2017 -0800 Committer: Thomas Weise <[email protected]> Committed: Mon Jan 9 10:23:09 2017 -0800 ---------------------------------------------------------------------- .../datatorrent/stram/StramLocalCluster.java | 48 +++++++++++++------- .../stram/StramLocalClusterTest.java | 17 +++++++ 2 files changed, 48 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/f6f6d5f5/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index 14a2827..e188b60 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -58,7 +58,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; import com.datatorrent.stram.plan.physical.PTOperator; /** - * Launcher for topologies in local mode within a single process. + * Launcher for topologies in embedded mode within a single process. * Child containers are mapped to threads. * * @since 0.3.2 @@ -67,7 +67,7 @@ public class StramLocalCluster implements Runnable, Controller { private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class); // assumes execution as unit test - private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName()); + private static final File DEFAULT_APP_DIR = new File("target", StramLocalCluster.class.getName()); private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname"; private static final String LOCALHOST = System.getProperty(LOCALHOST_PROPERTY_KEY, "localhost"); protected final StreamingContainerManager dnmgr; @@ -283,11 +283,24 @@ public class StramLocalCluster implements Runnable, Controller dag.validate(); // ensure plan can be serialized cloneLogicalPlan(dag); - // convert to URI so we always write to local file system, - // even when the environment has a default HDFS location. - String pathUri = CLUSTER_WORK_DIR.toURI().toString(); + final Path pathUri; + String appPath = dag.getAttributes().get(LogicalPlan.APPLICATION_PATH); + if (appPath == null) { + // convert to URI so we always write to local file system, + // even when the environment has a default HDFS location. + pathUri = new Path(DEFAULT_APP_DIR.toURI()); + dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri.toString()); + } else { + // should accept any valid path URI (or relative path) provided by user + Path tmp = new Path(appPath); + if (!tmp.isAbsolute()) { + pathUri = new Path(new File(appPath).toURI()); + } else { + pathUri = tmp; + } + } try { - FileContext.getLocalFSFileContext().delete(new Path(pathUri/*CLUSTER_WORK_DIR.getAbsolutePath()*/), true); + FileContext.getLocalFSFileContext().delete(pathUri, true); } catch (IllegalArgumentException e) { throw e; } catch (IOException e) { @@ -295,22 +308,11 @@ public class StramLocalCluster implements Runnable, Controller } dag.getAttributes().put(LogicalPlan.APPLICATION_ID, "app_local_" + System.currentTimeMillis()); - if (dag.getAttributes().get(LogicalPlan.APPLICATION_PATH) == null) { - dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri); - } if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null)); } this.dnmgr = new StreamingContainerManager(dag); this.umbilical = new UmbilicalProtocolLocalImpl(); - - if (!perContainerBufferServer) { - StreamingContainer.eventloop.start(); - bufferServer = new Server(0, 1024 * 1024,8); - bufferServer.setSpoolStorage(new DiskStorage()); - bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort()); - LOG.info("Buffer server started: {}", bufferServerAddress); - } } public static LogicalPlan cloneLogicalPlan(LogicalPlan lp) throws IOException, ClassNotFoundException @@ -442,6 +444,18 @@ public class StramLocalCluster implements Runnable, Controller @SuppressWarnings({"SleepWhileInLoop", "ResultOfObjectAllocationIgnored"}) public void run(long runMillis) { + if (!perContainerBufferServer) { + StreamingContainer.eventloop.start(); + bufferServer = new Server(0, 1024 * 1024,8); + try { + bufferServer.setSpoolStorage(new DiskStorage()); + } catch (IOException e) { + throw new RuntimeException(e); + } + bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort()); + LOG.info("Buffer server started: {}", bufferServerAddress); + } + long endMillis = System.currentTimeMillis() + runMillis; List<Thread> containerThreads = new LinkedList<>(); http://git-wip-us.apache.org/repos/asf/apex-core/blob/f6f6d5f5/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java index 1a5046c..5bea0b3 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java @@ -40,6 +40,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context; +import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.LocalMode; @@ -395,5 +396,21 @@ public class StramLocalClusterTest return new File(destDir, pojoClassName + ".jar").getAbsolutePath(); } + @Test + public void testAppPath() throws Exception + { + // add operator for initial checkpoint + TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); + o1.setMaxTuples(1); + File relPath = new File(dag.getAttributes().get(DAGContext.APPLICATION_PATH)); + String uriPath = relPath.toURI().toString(); + dag.setAttribute(DAGContext.APPLICATION_PATH, uriPath); + StramLocalCluster cluster = new StramLocalCluster(dag); + // no need for run(), just need the initial checkpoint + Assert.assertFalse(cluster.isFinished()); + Assert.assertTrue("app path exists", relPath.exists() && relPath.isDirectory()); + File checkPointDir = new File(relPath, LogicalPlan.SUBDIR_CHECKPOINTS); + Assert.assertTrue("checkpoint path exists", checkPointDir.exists() && checkPointDir.isDirectory()); + } }
