Repository: helix Updated Branches: refs/heads/helix-provisioning ae77f9149 -> 15f080cb5
Intermediate changes to AppSpec class Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/15f080cb Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/15f080cb Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/15f080cb Branch: refs/heads/helix-provisioning Commit: 15f080cb5052df746dbc4bddf0ed075ae44c6fbb Parents: ae77f91 Author: Kishore Gopalakrishna <[email protected]> Authored: Sun Feb 16 17:09:05 2014 -0800 Committer: Kishore Gopalakrishna <[email protected]> Committed: Sun Feb 16 17:09:05 2014 -0800 ---------------------------------------------------------------------- .../provisioner/ParticipantService.java | 10 - .../controller/provisioner/ServiceConfig.java | 5 + .../helix/controller/provisioner/Task.java | 5 + .../controller/provisioner/TaskConfig.java | 6 + .../helix/provisioning/yarn/AppLauncher.java | 320 +++++++++++++++++++ .../provisioning/yarn/ApplicationSpec.java | 26 +- .../yarn/ApplicationSpecFactory.java | 9 + .../apache/helix/provisioning/yarn/Client.java | 90 +++--- .../provisioning/yarn/ContainerParticipant.java | 27 -- .../provisioning/yarn/ParticipantLauncher.java | 19 +- .../helix/provisioning/yarn/TaskConfig.java | 13 + .../yarn/YamlApplicationSpecFactory.java | 31 +- .../yarn/example/HelloWorldService.java | 9 +- .../src/main/resources/sample_application.yaml | 26 ++ 14 files changed, 490 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java deleted file mode 100644 index bfcce06..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.apache.helix.controller.provisioner; - -public interface ParticipantService { - - // boolean init(ServiceConfig serviceConfig); - - boolean start(); - - boolean stop(); -} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java new file mode 100644 index 0000000..adccb2c --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java @@ -0,0 +1,5 @@ +package org.apache.helix.controller.provisioner; + +public class ServiceConfig { + +} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java new file mode 100644 index 0000000..ed3c762 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java @@ -0,0 +1,5 @@ +package org.apache.helix.controller.provisioner; + +public class Task { + +} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java new file mode 100644 index 0000000..9b47b9b --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java @@ -0,0 +1,6 @@ +package org.apache.helix.controller.provisioner; + +public class TaskConfig { + + +} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java new file mode 100644 index 0000000..b90bf8e --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java @@ -0,0 +1,320 @@ +package org.apache.helix.provisioning.yarn; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Vector; + +import org.apache.commons.compress.archivers.ArchiveStreamFactory; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; + +/** + * Main class to launch the job. + * Gets the yaml file as the input. + * Converts yaml file into ApplicationSpec. + */ +public class AppLauncher { + private static final Log LOG = LogFactory.getLog(Client.class); + + private ApplicationSpec _applicationSpec; + private YarnClient yarnClient; + private ApplicationSpecFactory _applicationSpecFactory; + private File _yamlConfigFile; + + private YarnConfiguration _conf; + + private File appMasterArchive; + + public AppLauncher(File appMasterArchive, ApplicationSpecFactory applicationSpecFactory, + File yamlConfigFile) throws Exception { + _applicationSpecFactory = applicationSpecFactory; + _yamlConfigFile = yamlConfigFile; + init(); + } + + private void init() throws Exception { + _applicationSpec = _applicationSpecFactory.fromYaml(new FileInputStream(_yamlConfigFile)); + yarnClient = YarnClient.createYarnClient(); + _conf = new YarnConfiguration(); + yarnClient.init(_conf); + } + + public boolean launch() throws Exception { + LOG.info("Running Client"); + yarnClient.start(); + + // Get a new application id + YarnClientApplication app = yarnClient.createApplication(); + GetNewApplicationResponse appResponse = app.getNewApplicationResponse(); + // TODO get min/max resource capabilities from RM and change memory ask if needed + // If we do not have min/max, we may not be able to correctly request + // the required resources from the RM for the app master + // Memory ask has to be a multiple of min and less than max. + // Dump out information about cluster capability as seen by the resource manager + int maxMem = appResponse.getMaximumResourceCapability().getMemory(); + LOG.info("Max mem capabililty of resources in this cluster " + maxMem); + + // set the application name + ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); + ApplicationId appId = appContext.getApplicationId(); + String appName = _applicationSpec.getAppName(); + appContext.setApplicationName(appName); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + // set local resources for the application master + // local files or archives as needed + // In this scenario, the jar file for the application master is part of the local resources + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + + LOG.info("Copy App archive file from local filesystem and add to local environment"); + // Copy the application master jar to the filesystem + // Create a local resource to point to the destination jar path + FileSystem fs = FileSystem.get(_conf); + + // copy App Master package + Path src = new Path(appMasterArchive.getAbsolutePath()); + String pathSuffix = appName + "/" + appId.getId() + "/" + "appmaster-pkg" + ".tar"; + Path dst = new Path(fs.getHomeDirectory(), pathSuffix); + fs.copyFromLocalFile(false, true, src, dst); + FileStatus destStatus = fs.getFileStatus(dst); + LocalResource amJarRsrc = setupLocalResource(dst, destStatus); + localResources.put("appmaster-pkg", amJarRsrc); + + // copy component packages + for (String serviceName : _applicationSpec.getServices()) { + src = new Path(_applicationSpec.getServicePackage(serviceName)); + pathSuffix = appName + "/" + appId.getId() + "/" + serviceName + ".tar"; + dst = new Path(fs.getHomeDirectory(), pathSuffix); + fs.copyFromLocalFile(false, true, src, dst); + destStatus = fs.getFileStatus(dst); + amJarRsrc = setupLocalResource(dst, destStatus); + localResources.put(serviceName + "-pkg", amJarRsrc); + } + // Set local resource info into app master container launch context + amContainer.setLocalResources(localResources); + + // Set the necessary security tokens as needed + // amContainer.setContainerTokens(containerToken); + + // Add AppMaster.jar location to classpath + // At some point we should not be required to add + // the hadoop specific classpaths to the env. + // It should be provided out of the box. + // For now setting all required classpaths including + // the classpath to "." for the application jar + StringBuilder classPathEnv = + new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*"); + StringBuilder appClassPathEnv = new StringBuilder(); + // put the jar files under the archive in the classpath + try { + final InputStream is = new FileInputStream(appMasterArchive); + final TarArchiveInputStream debInputStream = + (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is); + TarArchiveEntry entry = null; + while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) { + if (entry.isFile()) { + appClassPathEnv.append(File.pathSeparatorChar); + appClassPathEnv.append("./app-pkg/" + entry.getName()); + } + } + debInputStream.close(); + + } catch (Exception e) { + LOG.error("Unable to read archive file:" + appMasterArchive, e); + } + classPathEnv.append(appClassPathEnv); + for (String c : _conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); + } + classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties"); + + // add the runtime classpath needed for tests to work + if (_conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + classPathEnv.append(':'); + classPathEnv.append(System.getProperty("java.class.path")); + } + System.out.println("classpath" + classPathEnv.toString()); + // Set the env variables to be setup in the env where the application master will be run + LOG.info("Set the environment for the application master"); + Map<String, String> env = new HashMap<String, String>(); + env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + appId.getId() + + "/app-pkg.tar"); + env.put("appName", appName); + env.put("appId", "" + appId.getId()); + env.put("CLASSPATH", classPathEnv.toString()); + env.put("appClasspath", appClassPathEnv.toString()); + env.put("containerParticipantMainClass", + "org.apache.helix.provisioning.yarn.ParticipantLauncher"); + amContainer.setEnvironment(env); + + // Set the necessary command to execute the application master + Vector<CharSequence> vargs = new Vector<CharSequence>(30); + + // Set java executable command + LOG.info("Setting up app master command"); + vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); + int amMemory = 1024; + // Set Xmx based on am memory size + vargs.add("-Xmx" + amMemory + "m"); + // Set class name + vargs.add(HelixYarnApplicationMasterMain.class.getCanonicalName()); + // Set params for Application Master + //vargs.add("--num_containers " + String.valueOf(numContainers)); + + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + LOG.info("Completed setting up app master command " + command.toString()); + List<String> commands = new ArrayList<String>(); + commands.add(command.toString()); + amContainer.setCommands(commands); + + // Set up resource type requirements + // For now, only memory is supported so we set memory requirements + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(amMemory); + appContext.setResource(capability); + + // Service data is a binary blob that can be passed to the application + // Not needed in this scenario + // amContainer.setServiceData(serviceData); + + // Setup security tokens + if (UserGroupInformation.isSecurityEnabled()) { + Credentials credentials = new Credentials(); + String tokenRenewer = _conf.get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer"); + } + + // For now, only getting tokens for the default file-system. + final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials); + if (tokens != null) { + for (Token<?> token : tokens) { + LOG.info("Got dt for " + fs.getUri() + "; " + token); + } + } + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); + amContainer.setTokens(fsTokens); + } + + appContext.setAMContainerSpec(amContainer); + + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + int amPriority= 0; + // TODO - what is the range for priority? how to decide? + pri.setPriority(amPriority); + appContext.setPriority(pri); + + String amQueue=""; + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(amQueue); + + // Submit the application to the applications manager + // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); + // Ignore the response as either a valid response object is returned on success + // or an exception thrown to denote some form of a failure + LOG.info("Submitting application to ASM"); + + yarnClient.submitApplication(appContext); + + return true; + } + + private LocalResource setupLocalResource(Path dst, FileStatus destStatus) { + LocalResource amJarRsrc = Records.newRecord(LocalResource.class); + // Set the type of resource - file or archive + // archives are untarred at destination + // we don't need the jar file to be untarred for now + amJarRsrc.setType(LocalResourceType.ARCHIVE); + // Set visibility of the resource + // Setting to most private option + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + // Set the resource to be copied over + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + // Set timestamp and length of file so that the framework + // can do basic sanity checks for the local resource + // after it has been copied over to ensure it is the same + // resource the client intended to use with the application + amJarRsrc.setTimestamp(destStatus.getModificationTime()); + amJarRsrc.setSize(destStatus.getLen()); + return amJarRsrc; + } + + /** + * @return true if successfully completed, it will print status every X seconds + */ + public boolean waitUntilDone() { + while (true) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + break; + } + } + return true; + } + + /** + * will take the input file and AppSpecFactory class name as input + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + File appMasterArchive = new File (args[0]); + ApplicationSpecFactory applicationSpecFactory = + (ApplicationSpecFactory) Class.forName(args[1]).newInstance(); + File yamlConfigFile = new File(args[2]); + AppLauncher launcher = new AppLauncher(appMasterArchive, applicationSpecFactory, yamlConfigFile); + launcher.launch(); + launcher.waitUntilDone(); + + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java index 16b23fa..6c41542 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java @@ -1,12 +1,26 @@ package org.apache.helix.provisioning.yarn; +import java.net.URI; import java.util.List; +import org.apache.helix.api.config.ParticipantConfig; +import org.apache.helix.api.id.ParticipantId; + public interface ApplicationSpec { - public String getAppName(); - public int getMinContainers(); - public int getMaxContainers(); - public AppConfig getConfig(); - public List<String> getServices(); - public ServiceConfig getServiceConfig(String name); + /** + * Returns the name of the application + * @return + */ + String getAppName(); + + AppConfig getConfig(); + + List<String> getServices(); + + URI getServicePackage(String serviceName); + + ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId); + + List<TaskConfig> getTaskConfigs(); + } http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java new file mode 100644 index 0000000..352dc0c --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java @@ -0,0 +1,9 @@ +package org.apache.helix.provisioning.yarn; + +import java.io.InputStream; + +public interface ApplicationSpecFactory { + + ApplicationSpec fromYaml(InputStream yamlFile); + +} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java index 6611ec6..500df9c 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java @@ -137,7 +137,7 @@ public class Client { private String appMasterArchive = ""; // Main class to invoke application master private final String appMasterMainClass; - + private String appSpecFile = ""; // No. of containers in which helix participants will be started @@ -159,37 +159,6 @@ public class Client { private Options opts; /** - * @param args Command line arguments - */ - public static void main(String[] args) { - boolean result = false; - try { - Client client = new Client(); - LOG.info("Initializing Client"); - try { - boolean doRun = client.init(args); - if (!doRun) { - System.exit(0); - } - } catch (IllegalArgumentException e) { - System.err.println(e.getLocalizedMessage()); - client.printUsage(); - System.exit(-1); - } - result = client.run(); - } catch (Throwable t) { - LOG.fatal("Error running CLient", t); - System.exit(1); - } - if (result) { - LOG.info("Application completed successfully"); - System.exit(0); - } - LOG.error("Application failed to complete successfully"); - System.exit(2); - } - - /** */ public Client(Configuration conf) throws Exception { this("org.apache.helix.provisioning.yarn.HelixYarnApplicationMasterMain", conf); @@ -270,7 +239,7 @@ public class Client { appMasterArchive = cliParser.getOptionValue("archive"); numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "4")); - + log4jPropFile = cliParser.getOptionValue("log_properties", ""); return true; @@ -371,7 +340,7 @@ public class Client { amJarRsrc.setTimestamp(destStatus.getModificationTime()); amJarRsrc.setSize(destStatus.getLen()); localResources.put("app-pkg", amJarRsrc); - + Path localAppSpec = new Path(appSpecFile); pathSuffix = appName + "/" + appId.getId() + "/app-spec.yaml"; Path dstAppSpec = new Path(fs.getHomeDirectory(), pathSuffix); @@ -386,7 +355,6 @@ public class Client { appSpecResource.setSize(destStatus.getLen()); localResources.put("app-spec", appSpecResource); - // Set the log4j properties if needed if (!log4jPropFile.isEmpty()) { Path log4jSrc = new Path(log4jPropFile); @@ -408,7 +376,6 @@ public class Client { // Set the necessary security tokens as needed // amContainer.setContainerTokens(containerToken); - // Add AppMaster.jar location to classpath // At some point we should not be required to add // the hadoop specific classpaths to the env. @@ -452,12 +419,14 @@ public class Client { // Set the env variables to be setup in the env where the application master will be run LOG.info("Set the environment for the application master"); Map<String, String> env = new HashMap<String, String>(); - env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + appId.getId() + "/app-pkg.tar"); + env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + appId.getId() + + "/app-pkg.tar"); env.put("appName", appName); env.put("appId", "" + appId.getId()); env.put("CLASSPATH", classPathEnv.toString()); env.put("appClasspath", appClassPathEnv.toString()); - env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ParticipantLauncher"); + env.put("containerParticipantMainClass", + "org.apache.helix.provisioning.yarn.ParticipantLauncher"); amContainer.setEnvironment(env); // Set the necessary command to execute the application master @@ -597,11 +566,13 @@ public class Client { return false; } - /*if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { - LOG.info("Reached client specified timeout for application. Killing application"); - forceKillApplication(appId); - return false; - }*/ + /* + * if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { + * LOG.info("Reached client specified timeout for application. Killing application"); + * forceKillApplication(appId); + * return false; + * } + */ } } @@ -619,7 +590,38 @@ public class Client { // Response can be ignored as it is non-null on success or // throws an exception in case of failures - //yarnClient.killApplication(appId); + // yarnClient.killApplication(appId); + } + + /** + * @param args Command line arguments + */ + public static void main(String[] args) { + boolean result = false; + try { + Client client = new Client(); + LOG.info("Initializing Client"); + try { + boolean doRun = client.init(args); + if (!doRun) { + System.exit(0); + } + } catch (IllegalArgumentException e) { + System.err.println(e.getLocalizedMessage()); + client.printUsage(); + System.exit(-1); + } + result = client.run(); + } catch (Throwable t) { + LOG.fatal("Error running CLient", t); + System.exit(1); + } + if (result) { + LOG.info("Application completed successfully"); + System.exit(0); + } + LOG.error("Application failed to complete successfully"); + System.exit(2); } } http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java deleted file mode 100644 index 81a9c56..0000000 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.helix.provisioning.yarn; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.helix.HelixConnection; -import org.apache.helix.api.id.ClusterId; -import org.apache.helix.api.id.ParticipantId; -import org.apache.helix.manager.zk.AbstractParticipantService; - -public class ContainerParticipant extends AbstractParticipantService { - private static final Log LOG = LogFactory.getLog(ContainerParticipant.class); - - public ContainerParticipant(HelixConnection connection, ClusterId clusterId, - ParticipantId participantId) { - super(connection, clusterId, participantId); - } - - @Override - public void init() { - // register a state model - } - - @Override - public void onPreJoinCluster() { - // do tasks that require a connection - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java index 58e7a4f..8a3f19f 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java @@ -9,9 +9,16 @@ import org.apache.commons.cli.Options; import org.apache.helix.HelixConnection; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.manager.zk.AbstractParticipantService; import org.apache.helix.manager.zk.ZkHelixConnection; - +import org.apache.log4j.Logger; +/** + * + * Main class that invokes the Participant Api + */ public class ParticipantLauncher { + private static Logger LOG = Logger.getLogger(ParticipantLauncher.class); + public static void main(String[] args) { System.out.println("Starting Helix Participant: " + Arrays.toString(args)); @@ -20,6 +27,7 @@ public class ParticipantLauncher { opts.addOption("cluster", true, "Cluster name, default app name"); opts.addOption("participantId", true, "Participant Id"); opts.addOption("zkAddress", true, "Zookeeper address"); + opts.addOption("ParticipantClass", true, "ParticipantClass"); try { CommandLine cliParser = new GnuParser().parse(opts, args); String zkAddress = cliParser.getOptionValue("zkAddress"); @@ -27,8 +35,13 @@ public class ParticipantLauncher { connection.connect(); ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster")); ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId")); - ContainerParticipant containerParticipant = - new ContainerParticipant(connection, clusterId, participantId); + String participantClass = cliParser.getOptionValue("ParticipantClass"); + @SuppressWarnings("unchecked") + Class<? extends AbstractParticipantService> clazz = + (Class<? extends AbstractParticipantService>) Class.forName(participantClass); + AbstractParticipantService containerParticipant = + clazz.getConstructor(HelixConnection.class, ClusterId.class, ParticipantId.class) + .newInstance(connection, clusterId, participantId); containerParticipant.startAsync(); containerParticipant.awaitRunning(60, TimeUnit.SECONDS); Thread.currentThread().join(); http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java new file mode 100644 index 0000000..0b500a9 --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java @@ -0,0 +1,13 @@ +package org.apache.helix.provisioning.yarn; + +import java.util.HashMap; +import java.util.Map; + + +public class TaskConfig { + public Map<String, String> config = new HashMap<String, String>(); + + public String getValue(String key) { + return (config != null ? config.get(key) : null); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java index d5ac2c0..5a17755 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java @@ -1,9 +1,12 @@ package org.apache.helix.provisioning.yarn; import java.io.InputStream; +import java.net.URI; import java.util.List; import java.util.Map; +import org.apache.helix.api.config.ParticipantConfig; +import org.apache.helix.api.id.ParticipantId; import org.yaml.snakeyaml.Yaml; class DefaultApplicationSpec implements ApplicationSpec { @@ -22,16 +25,6 @@ class DefaultApplicationSpec implements ApplicationSpec { } @Override - public int getMinContainers() { - return minContainers; - } - - @Override - public int getMaxContainers() { - return maxContainers; - } - - @Override public AppConfig getConfig() { return appConfig; } @@ -41,10 +34,20 @@ class DefaultApplicationSpec implements ApplicationSpec { return services; } - @Override - public ServiceConfig getServiceConfig(String name) { - return serviceConfigMap.get(name); - } + @Override + public URI getServicePackage(String serviceName) { + return null; + } + + @Override + public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId) { + return null; + } + + @Override + public List<TaskConfig> getTaskConfigs() { + return null; + } } public class YamlApplicationSpecFactory { http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java index 055bfd7..db9e524 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java @@ -4,10 +4,10 @@ import org.apache.helix.HelixConnection; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ParticipantId; import org.apache.helix.api.id.StateModelDefId; -import org.apache.helix.provisioning.yarn.ContainerParticipant; +import org.apache.helix.manager.zk.AbstractParticipantService; -public class HelloWorldService extends ContainerParticipant { +public class HelloWorldService extends AbstractParticipantService { public HelloWorldService(HelixConnection connection, ClusterId clusterId, ParticipantId participantId) { @@ -19,5 +19,10 @@ public class HelloWorldService extends ContainerParticipant { HelloWorldStateModelFactory stateModelFactory = new HelloWorldStateModelFactory(); getParticipant().getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("OnlineOffline"), stateModelFactory); } + + @Override + public void onPreJoinCluster() { + //this will be invoked prior to + } } http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/resources/sample_application.yaml ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/resources/sample_application.yaml b/helix-provisioning/src/main/resources/sample_application.yaml new file mode 100644 index 0000000..f45faa3 --- /dev/null +++ b/helix-provisioning/src/main/resources/sample_application.yaml @@ -0,0 +1,26 @@ +=== +appName: test +configs: + k1: v1 +services: + - name: myservice + participantClass: org.apache.helix.myApp.SimpleWebserver + minContainers:3 + maxContainers:3 + configs: + - participantId: myservice_0 + port: 9500 + - participantId: myservice_1 + port: 9501 + - participantId: myservice_2 + port: 9502 +resources: + - name: distributedLock + numPartitions: 6 + numReplicas: 2 + rebalanceMode: FULL_AUTO + stateModel: OnlineOffline + tag: myservice + configs: + k1: v1 + \ No newline at end of file
