Repository: apex-core Updated Branches: refs/heads/master 59bdc81f8 -> 81b8c922c
APEXCORE-552 #resolve added support for application tags Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/81b8c922 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/81b8c922 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/81b8c922 Branch: refs/heads/master Commit: 81b8c922c6442cbf30104eba34ede826615ac7d4 Parents: 59bdc81 Author: David Yan <da...@datatorrent.com> Authored: Thu Oct 6 14:23:10 2016 -0700 Committer: David Yan <da...@datatorrent.com> Committed: Tue Oct 11 11:29:00 2016 -0700 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/StramClient.java | 11 +++++++++++ .../java/com/datatorrent/stram/cli/ApexCli.java | 20 ++++++++++++++++++++ .../stram/client/StramAppLauncher.java | 17 ++++++++++++----- 3 files changed, 43 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 89bca14..45e3fbd 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -27,9 +27,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,6 +123,7 @@ public class StramClient private String archives; private String files; private LinkedHashSet<String> resources; + private Set<String> tags = new HashSet<>(); // platform dependencies that are not part of Hadoop and need to be deployed, // entry below will cause containing jar file from client to be copied to cluster @@ -605,6 +608,9 @@ public class StramClient // Set the queue to which this application is to be submitted in the RM appContext.setQueue(queueName); + // set the application tags + appContext.setApplicationTags(tags); + // Submit the application to the applications manager // SubmitApplicationResponse submitResp = rmClient.submitApplication(appRequest); // Ignore the response as either a valid response object is returned on success @@ -686,6 +692,11 @@ public class StramClient this.queueName = queueName; } + public void addTag(String tag) + { + this.tags.add(tag); + } + public void setResources(LinkedHashSet<String> resources) { this.resources = resources; http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java index 69af2e3..5cfde36 100644 --- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java +++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java @@ -1854,6 +1854,9 @@ public class ApexCli config.set(StramAppLauncher.ORIGINAL_APP_ID, commandLineInfo.origAppId); } config.set(StramAppLauncher.QUEUE_NAME, commandLineInfo.queue != null ? commandLineInfo.queue : "default"); + if (commandLineInfo.tags != null) { + config.set(StramAppLauncher.TAGS, commandLineInfo.tags); + } } catch (Exception ex) { throw new CliException("Error opening the config XML file: " + configFile, ex); } @@ -2184,6 +2187,11 @@ public class ApexCli jsonObj.put("state", ar.getYarnApplicationState().name()); jsonObj.put("trackingUrl", ar.getTrackingUrl()); jsonObj.put("finalStatus", ar.getFinalApplicationStatus()); + JSONArray tags = new JSONArray(); + for (String tag : ar.getApplicationTags()) { + tags.put(tag); + } + jsonObj.put("tags", tags); totalCnt++; if (ar.getYarnApplicationState() == YarnApplicationState.RUNNING) { @@ -3371,6 +3379,11 @@ public class ApexCli response.put("state", appReport.getYarnApplicationState().name()); response.put("trackingUrl", appReport.getTrackingUrl()); response.put("finalStatus", appReport.getFinalApplicationStatus()); + JSONArray tags = new JSONArray(); + for (String tag : appReport.getApplicationTags()) { + tags.put(tag); + } + response.put("tags", tags); printJson(response); } @@ -3631,6 +3644,10 @@ public class ApexCli launchArgs.add("-queue"); launchArgs.add(commandLineInfo.queue); } + if (commandLineInfo.tags != null) { + launchArgs.add("-tags"); + launchArgs.add(commandLineInfo.tags); + } launchArgs.add(appFile); if (!appFile.endsWith(".json") && !appFile.endsWith(".properties")) { launchArgs.add(selectedApp.name); @@ -3902,6 +3919,7 @@ public class ApexCli final Option originalAppID = add(OptionBuilder.withArgName("application id").hasArg().withDescription("Specify original application identifier for restart.").create("originalAppId")); final Option exactMatch = add(new Option("exactMatch", "Only consider applications with exact app name")); final Option queue = add(OptionBuilder.withArgName("queue name").hasArg().withDescription("Specify the queue to launch the application").create("queue")); + final Option tags = add(OptionBuilder.withArgName("comma separated tags").hasArg().withDescription("Specify the tags for the application").create("tags")); final Option force = add(new Option("force", "Force launch the application. Do not check for compatibility")); final Option useConfigApps = add(OptionBuilder.withArgName("inclusive or exclusive").hasArg().withDescription("\"inclusive\" - merge the apps in config and app package. \"exclusive\" - only show config package apps.").create("useConfigApps")); @@ -3940,6 +3958,7 @@ public class ApexCli result.archives = line.getOptionValue(LAUNCH_OPTIONS.archives.getOpt()); result.files = line.getOptionValue(LAUNCH_OPTIONS.files.getOpt()); result.queue = line.getOptionValue(LAUNCH_OPTIONS.queue.getOpt()); + result.tags = line.getOptionValue(LAUNCH_OPTIONS.tags.getOpt()); result.args = line.getArgs(); result.origAppId = line.getOptionValue(LAUNCH_OPTIONS.originalAppID.getOpt()); result.exactMatch = line.hasOption("exactMatch"); @@ -3959,6 +3978,7 @@ public class ApexCli String libjars; String files; String queue; + String tags; String archives; String origAppId; boolean exactMatch; http://git-wip-us.apache.org/repos/asf/apex-core/blob/81b8c922/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 619252f..961a97b 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -88,11 +88,12 @@ import com.datatorrent.stram.security.StramUserLogin; public class StramAppLauncher { public static final String CLASSPATH_RESOLVERS_KEY_NAME = StreamingApplication.DT_PREFIX + "classpath.resolvers"; - public static final String LIBJARS_CONF_KEY_NAME = "tmplibjars"; - public static final String FILES_CONF_KEY_NAME = "tmpfiles"; - public static final String ARCHIVES_CONF_KEY_NAME = "tmparchives"; - public static final String ORIGINAL_APP_ID = "tmpOriginalAppId"; - public static final String QUEUE_NAME = "queueName"; + public static final String LIBJARS_CONF_KEY_NAME = "_apex.libjars"; + public static final String FILES_CONF_KEY_NAME = "_apex.files"; + public static final String ARCHIVES_CONF_KEY_NAME = "_apex.archives"; + public static final String ORIGINAL_APP_ID = "_apex.originalAppId"; + public static final String QUEUE_NAME = "_apex.queueName"; + public static final String TAGS = "_apex.tags"; private static final Logger LOG = LoggerFactory.getLogger(StramAppLauncher.class); private File jarFile; @@ -630,6 +631,12 @@ public class StramAppLauncher client.setArchives(conf.get(ARCHIVES_CONF_KEY_NAME)); client.setOriginalAppId(conf.get(ORIGINAL_APP_ID)); client.setQueueName(conf.get(QUEUE_NAME)); + String tags = conf.get(TAGS); + if (tags != null) { + for (String tag : tags.split(",")) { + client.addTag(tag.trim()); + } + } client.startApplication(); return client.getApplicationReport().getApplicationId(); } finally {