Repository: apex-core Updated Branches: refs/heads/master 130ce6ba8 -> a42c67bc2
APEXCORE-495 Config packages now support *apps* Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a42c67bc Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a42c67bc Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a42c67bc Branch: refs/heads/master Commit: a42c67bc213d7b58c6924426cb07d71fe701ce1b Parents: 130ce6b Author: sandeshh <[email protected]> Authored: Mon Aug 1 18:27:29 2016 -0700 Committer: sandeshh <[email protected]> Committed: Mon Aug 15 18:05:40 2016 -0700 ---------------------------------------------------------------------- .../main/resources/archetype-resources/pom.xml | 1 + .../src/assemble/confPackage.xml | 4 + .../java/com/datatorrent/stram/cli/ApexCli.java | 61 ++++++++++++- .../datatorrent/stram/client/AppPackage.java | 29 +++--- .../datatorrent/stram/client/ConfigPackage.java | 40 +++++++++ .../stram/client/StramClientUtils.java | 25 ++++++ .../com/datatorrent/stram/cli/ApexCliTest.java | 94 ++++++++++++++++---- .../testConfigPackageSrc/app/testApp.json | 53 +++++++++++ 8 files changed, 270 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml ---------------------------------------------------------------------- diff --git a/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml b/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml index 4645c77..81e403c 100644 --- a/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml +++ b/apex-conf-archetype/src/main/resources/archetype-resources/pom.xml @@ -25,6 +25,7 @@ <apex.apppackage.maxversion>1.9999.9999</apex.apppackage.maxversion> <apex.appconf.classpath>classpath/*</apex.appconf.classpath> <apex.appconf.files>files/*</apex.appconf.files> + <apex.appconf.app>app/*</apex.appconf.app> </properties> <build> http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml ---------------------------------------------------------------------- diff --git a/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml b/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml index 4d5d7c7..a03dce9 100644 --- a/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml +++ b/apex-conf-archetype/src/main/resources/archetype-resources/src/assemble/confPackage.xml @@ -19,6 +19,10 @@ <directory>${basedir}/src/main/resources/files</directory> <outputDirectory>/files</outputDirectory> </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> </fileSets> </assembly> http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index 4e1f201..91d29bd 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -185,6 +185,9 @@ public class ApexCli private String kerberosPrincipal; private String kerberosKeyTab; + private static String CONFIG_EXCLUSIVE = "exclusive"; + private static String CONFIG_INCLUSIVE = "inclusive"; + private static class FileLineReader extends ConsoleReader { private final BufferedReader br; @@ -978,7 +981,7 @@ public class ApexCli return null; } - private static class CliException extends RuntimeException + static class CliException extends RuntimeException { private static final long serialVersionUID = 1L; @@ -3453,7 +3456,7 @@ public class ApexCli matchAppName = commandLineInfo.args[1]; } - List<AppInfo> applications = new ArrayList<>(ap.getApplications()); + List<AppInfo> applications = getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps); if (matchAppName != null) { Iterator<AppInfo> it = applications.iterator(); @@ -3639,7 +3642,9 @@ public class ApexCli DTConfiguration getLaunchAppPackageProperties(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, String appName) throws Exception { DTConfiguration launchProperties = new DTConfiguration(); - List<AppInfo> applications = ap.getApplications(); + + List<AppInfo> applications = getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps); + AppInfo selectedApp = null; for (AppInfo app : applications) { if (app.name.equals(appName)) { @@ -3715,6 +3720,52 @@ public class ApexCli return launchProperties; } + private List<AppInfo> getAppsFromPackageAndConfig(AppPackage ap, ConfigPackage cp, String configApps) + { + if (cp == null || configApps == null || !(configApps.equals(CONFIG_INCLUSIVE) || configApps.equals(CONFIG_EXCLUSIVE))) { + return ap.getApplications(); + } + + File src = new File(cp.tempDirectory(), "app"); + File dest = new File(ap.tempDirectory(), "app"); + + if (!src.exists()) { + return ap.getApplications(); + } + + if (configApps.equals(CONFIG_EXCLUSIVE)) { + + for (File file : dest.listFiles()) { + + if (file.getName().endsWith(".json")) { + FileUtils.deleteQuietly(new File(dest, file.getName())); + } + } + } else { + for (File file : src.listFiles()) { + FileUtils.deleteQuietly(new File(dest, file.getName())); + } + } + + for (File file : src.listFiles()) { + try { + FileUtils.moveFileToDirectory(file, dest, true); + } catch (IOException e) { + LOG.warn("Application from the config file {} failed while processing.", file.getName()); + } + } + + try { + FileUtils.deleteDirectory(src); + } catch (IOException e) { + LOG.warn("Failed to delete the Config Apps folder"); + } + + ap.processAppDirectory(configApps.equals(CONFIG_EXCLUSIVE)); + + return ap.getApplications(); + } + private class GetAppPackageOperatorsCommand implements Command { @Override @@ -3850,6 +3901,7 @@ public class ApexCli final Option exactMatch = add(new Option("exactMatch", "Only consider applications with exact app name")); final Option queue = add(OptionBuilder.withArgName("queue name").hasArg().withDescription("Specify the queue to launch the application").create("queue")); final Option force = add(new Option("force", "Force launch the application. Do not check for compatibility")); + final Option useConfigApps = add(OptionBuilder.withArgName("inclusive or exclusive").hasArg().withDescription("\"inclusive\" - merge the apps in config and app package. \"exclusive\" - only show config package apps.").create("useConfigApps")); private Option add(Option opt) { @@ -3890,6 +3942,8 @@ public class ApexCli result.origAppId = line.getOptionValue(LAUNCH_OPTIONS.originalAppID.getOpt()); result.exactMatch = line.hasOption("exactMatch"); result.force = line.hasOption("force"); + result.useConfigApps = line.getOptionValue(LAUNCH_OPTIONS.useConfigApps.getOpt()); + return result; } @@ -3908,6 +3962,7 @@ public class ApexCli boolean exactMatch; boolean force; String[] args; + String useConfigApps; } @SuppressWarnings("static-access") http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java index 863b535..a260fc0 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java +++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java @@ -49,6 +49,7 @@ import net.lingala.zip4j.core.ZipFile; import net.lingala.zip4j.exception.ZipException; import net.lingala.zip4j.model.ZipParameters; + /** * <p> * AppPackage class.</p> @@ -159,7 +160,7 @@ public class AppPackage extends JarFile classPath.addAll(Arrays.asList(StringUtils.split(classPathString, " "))); extractToDirectory(directory, file); if (processAppDirectory) { - processAppDirectory(new File(directory, "app")); + processAppDirectory(false); } File confDirectory = new File(directory, "conf"); if (confDirectory.exists()) { @@ -318,8 +319,11 @@ public class AppPackage extends JarFile return Collections.unmodifiableMap(defaultProperties); } - private void processAppDirectory(File dir) + public void processAppDirectory(boolean skipJars) { + File dir = new File(directory, "app"); + applications.clear(); + Configuration config = new Configuration(); List<String> absClassPath = new ArrayList<>(classPath); @@ -333,7 +337,7 @@ public class AppPackage extends JarFile File[] files = dir.listFiles(); for (File entry : files) { - if (entry.getName().endsWith(".jar")) { + if (entry.getName().endsWith(".jar") && !skipJars) { appJars.add(entry.getName()); try { StramAppLauncher stramAppLauncher = new StramAppLauncher(entry, config); @@ -371,23 +375,12 @@ public class AppPackage extends JarFile for (File entry : files) { if (entry.getName().endsWith(".json")) { appJsonFiles.add(entry.getName()); - try { - AppFactory appFactory = new StramAppLauncher.JsonFileAppFactory(entry); - StramAppLauncher stramAppLauncher = new StramAppLauncher(entry.getName(), config); - stramAppLauncher.loadDependencies(); - AppInfo appInfo = new AppInfo(appFactory.getName(), entry.getName(), "json"); - appInfo.displayName = appFactory.getDisplayName(); - try { - appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration()); - appInfo.dag.validate(); - } catch (Exception ex) { - appInfo.error = ex.getMessage(); - appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex); - } + AppInfo appInfo = StramClientUtils.jsonFileToAppInfo(entry, config); + + if (appInfo != null) { applications.add(appInfo); - } catch (Exception ex) { - LOG.error("Caught exceptions trying to process {}", entry.getName(), ex); } + } else if (entry.getName().endsWith(".properties")) { appPropertiesFiles.add(entry.getName()); try { http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java b/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java index f81ba67..0d833a4 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java +++ b/engine/src/main/java/com/datatorrent/stram/client/ConfigPackage.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import net.lingala.zip4j.core.ZipFile; import net.lingala.zip4j.exception.ZipException; @@ -71,6 +72,7 @@ public class ConfigPackage extends JarFile implements Closeable private final Map<String, String> properties = new TreeMap<>(); private final Map<String, Map<String, String>> appProperties = new TreeMap<>(); + private final List<AppPackage.AppInfo> applications = new ArrayList<>(); /** * Creates an Config Package object. @@ -114,6 +116,12 @@ public class ConfigPackage extends JarFile implements Closeable directory = newDirectory.getAbsolutePath(); zipFile.extractAll(directory); processPropertiesXml(); + processAppDirectory(new File(directory, "app")); + } + + public List<AppPackage.AppInfo> getApplications() + { + return Collections.unmodifiableList(applications); } public String tempDirectory() @@ -177,6 +185,38 @@ public class ConfigPackage extends JarFile implements Closeable } } + private void processAppDirectory(File dir) + { + if (!dir.exists()) { + return; + } + + Configuration config = new Configuration(); + + List<String> absClassPath = new ArrayList<>(classPath); + for (int i = 0; i < absClassPath.size(); i++) { + String path = absClassPath.get(i); + if (!path.startsWith("/")) { + absClassPath.set(i, directory + "/" + path); + } + } + + config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, StringUtils.join(absClassPath, ',')); + + File[] files = dir.listFiles(); + + for (File entry : files) { + if (entry.getName().endsWith(".json")) { + + AppPackage.AppInfo appInfo = StramClientUtils.jsonFileToAppInfo(entry, config); + + if (appInfo != null) { + applications.add(appInfo); + } + } + } + } + private void processPropertiesXml() { File dir = new File(directory, "META-INF"); http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index 85d6b0c..fc60961 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; @@ -843,4 +844,28 @@ public class StramClientUtils return result; } + public static AppPackage.AppInfo jsonFileToAppInfo(File file, Configuration config) + { + AppPackage.AppInfo appInfo = null; + + try { + StramAppLauncher.AppFactory appFactory = new StramAppLauncher.JsonFileAppFactory(file); + StramAppLauncher stramAppLauncher = new StramAppLauncher(file.getName(), config); + stramAppLauncher.loadDependencies(); + appInfo = new AppPackage.AppInfo(appFactory.getName(), file.getName(), "json"); + appInfo.displayName = appFactory.getDisplayName(); + try { + appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration()); + appInfo.dag.validate(); + } catch (Exception ex) { + appInfo.error = ex.getMessage(); + appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex); + } + } catch (Exception ex) { + LOG.error("Caught exceptions trying to process {}", file.getName(), ex); + } + + return appInfo; + } + } http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java index f26463d..c049e5b 100644 --- a/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java +++ b/engine/src/test/java/com/datatorrent/stram/cli/ApexCliTest.java @@ -22,8 +22,10 @@ import java.io.File; import java.util.HashMap; import java.util.Map; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -35,7 +37,7 @@ import com.datatorrent.stram.client.ConfigPackage; import com.datatorrent.stram.client.DTConfiguration; import com.datatorrent.stram.support.StramTestSupport; -import static com.datatorrent.stram.support.StramTestSupport.setEnv; +import jline.console.ConsoleReader; /** * @@ -52,33 +54,31 @@ public class ApexCliTest private static File appFile; private static File configFile; - - private static AppPackage ap; - private static ConfigPackage cp; + private AppPackage ap; + private ConfigPackage cp; static TemporaryFolder testFolder = new TemporaryFolder(); - static ApexCli cli = new ApexCli(); + ApexCli cli; static Map<String, String> env = new HashMap<String, String>(); static String userHome; @BeforeClass - public static void starting() + public static void createPackages() { + userHome = System.getProperty("user.home"); + String newHome = System.getProperty("user.dir") + "/target"; try { - userHome = System.getProperty("user.home"); - String newHome = System.getProperty("user.dir") + "/target"; + FileUtils.forceMkdir(new File(newHome + "/.dt")); FileUtils.copyFile(new File(System.getProperty("user.dir") + "/src/test/resources/testAppPackage/dt-site.xml"), new File(newHome + "/.dt/dt-site.xml")); env.put("HOME", newHome); - setEnv(env); - - cli.init(); + StramTestSupport.setEnv(env); // Set up jar file to use with constructor testFolder.create(); + appFile = StramTestSupport.createAppPackageFile(); configFile = StramTestSupport.createConfigPackageFile(new File(testFolder.getRoot(), configJarPath)); - ap = new AppPackage(appFile, true); - cp = new ConfigPackage(configFile); + } catch (Exception e) { throw new RuntimeException(e); } @@ -88,9 +88,7 @@ public class ApexCliTest public static void finished() { try { - env.put("HOME", userHome); - setEnv(env); - + StramTestSupport.removeAppPackageFile(); FileUtils.forceDelete(configFile); testFolder.delete(); @@ -99,6 +97,29 @@ public class ApexCliTest } } + @Before + public void startingTest() + { + try { + + cli = new ApexCli(); + cli.init(); + + ap = new AppPackage(appFile, true); + cp = new ConfigPackage(configFile); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @After + public void finishedTest() + { + ap = null; + cp = null; + cli = null; + } + @Test public void testLaunchAppPackagePropertyPrecedence() throws Exception { @@ -128,6 +149,9 @@ public class ApexCliTest { ApexCli.LaunchCommandLineInfo commandLineInfo = ApexCli .getLaunchCommandLineInfo(new String[]{"-exactMatch", "-conf", configFile.getAbsolutePath(), appFile.getAbsolutePath(), "MyFirstApplication"}); + + commandLineInfo.args = new String[] {"MyFirstApplication"}; + String[] args = cli.getLaunchAppPackageArgs(ap, cp, commandLineInfo, null); commandLineInfo = ApexCli.getLaunchCommandLineInfo(args); StringBuilder sb = new StringBuilder(); @@ -180,4 +204,42 @@ public class ApexCliTest Assert.assertEquals("app-default", props.get("dt.test.5")); Assert.assertEquals("package-default", props.get("dt.test.6")); } + + @Test + public void testAppFromOnlyConfigPackage() throws Exception + { + ApexCli.LaunchCommandLineInfo commandLineInfo = ApexCli + .getLaunchCommandLineInfo(new String[]{"-conf", configFile.getAbsolutePath(), appFile.getAbsolutePath(), "-useConfigApps", "exclusive"}); + + ApexCli apexCli = new ApexCli(); + apexCli.init(); + + Assert.assertEquals("configApps", "exclusive", commandLineInfo.useConfigApps); + + apexCli.getLaunchAppPackageArgs(ap, cp, commandLineInfo, new ConsoleReader()); + + Assert.assertEquals(ap.getApplications().size(), 1); + Assert.assertEquals(ap.getApplications().get(0).displayName, "testApp"); + Assert.assertEquals(ap.getApplications().get(0).type, "json"); + } + + @Test + public void testMergeAppFromConfigAndAppPackage() throws Exception + { + ApexCli.LaunchCommandLineInfo commandLineInfo = ApexCli + .getLaunchCommandLineInfo(new String[]{"-conf", configFile.getAbsolutePath(), appFile.getAbsolutePath(), "-useConfigApps", "inclusive"}); + + Assert.assertEquals("configApps", "inclusive", commandLineInfo.useConfigApps); + + ApexCli apexCli = new ApexCli(); + apexCli.init(); + + try { + apexCli.getLaunchAppPackageArgs(ap, cp, commandLineInfo, new ConsoleReader()); + } catch (ApexCli.CliException cliException) { + return; + } + + Assert.fail("Cli failed throw multiple apps exception."); + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/a42c67bc/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json ---------------------------------------------------------------------- diff --git a/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json b/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json new file mode 100644 index 0000000..2ef4871 --- /dev/null +++ b/engine/src/test/resources/testConfigPackage/testConfigPackageSrc/app/testApp.json @@ -0,0 +1,53 @@ +{ + "displayName":"testApp", + "attributes":{ + + }, + "operators":[ + { + "name":"Operator 1", + "attributes":{ + + }, + "class":"org.apache.apex.test.DevNull", + "ports":[ + { + "name":"inputPort", + "attributes":{ + + } + } + ] + }, + { + "name":"Operator 2", + "attributes":{ + + }, + "class":"org.apache.apex.test.RandomGen", + "ports":[ + { + "name":"out", + "attributes":{ + + } + } + ] + } + ], + "streams":[ + { + "name":"Stream 1", + "sinks":[ + { + "operatorName":"Operator 1", + "portName":"inputPort" + } + ], + "source":{ + "operatorName":"Operator 2", + "portName":"out" + } + } + ] +}
