APEX-159 #resolve StramMiniClusterTest.testOperatorFailureRecovery succeeds with unexpected error condition
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/0fc22a09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/0fc22a09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/0fc22a09 Branch: refs/heads/master Commit: 0fc22a094d5ae67e249f8dbc728c5e6a32eecfbc Parents: 0a85586 Author: Vlad Rozov <[email protected]> Authored: Sat Sep 26 19:59:02 2015 -0700 Committer: Vlad Rozov <[email protected]> Committed: Sat Sep 26 19:59:02 2015 -0700 ---------------------------------------------------------------------- .../stram/LaunchContainerRunnable.java | 27 +++++++------------- .../java/com/datatorrent/stram/StramClient.java | 10 +------- .../datatorrent/stram/StramMiniClusterTest.java | 4 +-- 3 files changed, 12 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java index 0a6c062..863808a 100644 --- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java +++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java @@ -118,28 +118,19 @@ public class LaunchContainerRunnable implements Runnable LOG.info("CLASSPATH: {}", classPathEnv); } + public static void addFileToLocalResources(final String name, final FileStatus fileStatus, final LocalResourceType type, final Map<String, LocalResource> localResources) + { + final LocalResource localResource = LocalResource.newInstance(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()), + type, LocalResourceVisibility.APPLICATION, fileStatus.getLen(), fileStatus.getModificationTime()); + localResources.put(name, localResource); + } + public static void addFilesToLocalResources(LocalResourceType type, String commaSeparatedFileNames, Map<String, LocalResource> localResources, FileSystem fs) throws IOException { String[] files = StringUtils.splitByWholeSeparator(commaSeparatedFileNames, StramClient.LIB_JARS_SEP); for (String file : files) { - Path dst = new Path(file); - // Create a local resource to point to the destination jar path - FileStatus destStatus = fs.getFileStatus(dst); - LocalResource amJarRsrc = Records.newRecord(LocalResource.class); - // Set the type of resource - file or archive - amJarRsrc.setType(type); - // Set visibility of the resource - // Setting to most private option - amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); - // Set the resource to be copied over - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); - // Set timestamp and length of file so that the framework - // can do basic sanity checks for the local resource - // after it has been copied over to ensure it is the same - // resource the client intended to use with the application - amJarRsrc.setTimestamp(destStatus.getModificationTime()); - amJarRsrc.setSize(destStatus.getLen()); - localResources.put(dst.getName(), amJarRsrc); + final Path dst = new Path(file); + addFileToLocalResources(dst.getName(), fs.getFileStatus(dst), type, localResources); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 7edc628..7abfc82 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -497,15 +497,7 @@ public class StramClient outStream = fs.create(launchConfigDst, true); conf.writeXml(outStream); outStream.close(); - - FileStatus topologyFileStatus = fs.getFileStatus(cfgDst); - LocalResource topologyRsrc = Records.newRecord(LocalResource.class); - topologyRsrc.setType(LocalResourceType.FILE); - topologyRsrc.setVisibility(LocalResourceVisibility.APPLICATION); - topologyRsrc.setResource(ConverterUtils.getYarnUrlFromURI(cfgDst.toUri())); - topologyRsrc.setTimestamp(topologyFileStatus.getModificationTime()); - topologyRsrc.setSize(topologyFileStatus.getLen()); - localResources.put(LogicalPlan.SER_FILE_NAME, topologyRsrc); + LaunchContainerRunnable.addFileToLocalResources(LogicalPlan.SER_FILE_NAME, fs.getFileStatus(cfgDst), LocalResourceType.FILE, localResources); // Set local resource info into app master container launch context amContainer.setLocalResources(localResources); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/0fc22a09/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java index 493156b..5d37f76 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java @@ -364,8 +364,8 @@ public class StramMiniClusterTest { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir, null); + dag.setAttribute(LogicalPlan.APPLICATION_PATH, "file:" + System.getProperty("user.dir") + '/' + testMeta.dir); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(dag.getAttributes().get(LogicalPlan.APPLICATION_PATH), null); agent.setSyncCheckpoint(true); dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class);
