Repository: helix
Updated Branches:
  refs/heads/helix-provisioning ae77f9149 -> 15f080cb5


Intermediate changes to AppSpec class


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/15f080cb
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/15f080cb
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/15f080cb

Branch: refs/heads/helix-provisioning
Commit: 15f080cb5052df746dbc4bddf0ed075ae44c6fbb
Parents: ae77f91
Author: Kishore Gopalakrishna <[email protected]>
Authored: Sun Feb 16 17:09:05 2014 -0800
Committer: Kishore Gopalakrishna <[email protected]>
Committed: Sun Feb 16 17:09:05 2014 -0800

----------------------------------------------------------------------
 .../provisioner/ParticipantService.java         |  10 -
 .../controller/provisioner/ServiceConfig.java   |   5 +
 .../helix/controller/provisioner/Task.java      |   5 +
 .../controller/provisioner/TaskConfig.java      |   6 +
 .../helix/provisioning/yarn/AppLauncher.java    | 320 +++++++++++++++++++
 .../provisioning/yarn/ApplicationSpec.java      |  26 +-
 .../yarn/ApplicationSpecFactory.java            |   9 +
 .../apache/helix/provisioning/yarn/Client.java  |  90 +++---
 .../provisioning/yarn/ContainerParticipant.java |  27 --
 .../provisioning/yarn/ParticipantLauncher.java  |  19 +-
 .../helix/provisioning/yarn/TaskConfig.java     |  13 +
 .../yarn/YamlApplicationSpecFactory.java        |  31 +-
 .../yarn/example/HelloWorldService.java         |   9 +-
 .../src/main/resources/sample_application.yaml  |  26 ++
 14 files changed, 490 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
deleted file mode 100644
index bfcce06..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.helix.controller.provisioner;
-
-public interface ParticipantService {
-
-  // boolean init(ServiceConfig serviceConfig);
-
-  boolean start();
-
-  boolean stop();
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
new file mode 100644
index 0000000..adccb2c
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ServiceConfig.java
@@ -0,0 +1,5 @@
+package org.apache.helix.controller.provisioner;
+
+public class ServiceConfig {
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java
new file mode 100644
index 0000000..ed3c762
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/Task.java
@@ -0,0 +1,5 @@
+package org.apache.helix.controller.provisioner;
+
+public class Task {
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java
 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java
new file mode 100644
index 0000000..9b47b9b
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TaskConfig.java
@@ -0,0 +1,6 @@
+package org.apache.helix.controller.provisioner;
+
+public class TaskConfig {
+
+  
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
new file mode 100644
index 0000000..b90bf8e
--- /dev/null
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -0,0 +1,320 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Main class to launch the job.
+ * Gets the yaml file as the input.
+ * Converts yaml file into ApplicationSpec.
+ */
+public class AppLauncher {
+  private static final Log LOG = LogFactory.getLog(Client.class);
+
+  private ApplicationSpec _applicationSpec;
+  private YarnClient yarnClient;
+  private ApplicationSpecFactory _applicationSpecFactory;
+  private File _yamlConfigFile;
+
+  private YarnConfiguration _conf;
+
+  private File appMasterArchive;
+
+  public AppLauncher(File appMasterArchive, ApplicationSpecFactory 
applicationSpecFactory,
+      File yamlConfigFile) throws Exception {
+    _applicationSpecFactory = applicationSpecFactory;
+    _yamlConfigFile = yamlConfigFile;
+    init();
+  }
+
+  private void init() throws Exception {
+    _applicationSpec = _applicationSpecFactory.fromYaml(new 
FileInputStream(_yamlConfigFile));
+    yarnClient = YarnClient.createYarnClient();
+    _conf = new YarnConfiguration();
+    yarnClient.init(_conf);
+  }
+
+  public boolean launch() throws Exception {
+    LOG.info("Running Client");
+    yarnClient.start();
+
+    // Get a new application id
+    YarnClientApplication app = yarnClient.createApplication();
+    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+    // TODO get min/max resource capabilities from RM and change memory ask if 
needed
+    // If we do not have min/max, we may not be able to correctly request
+    // the required resources from the RM for the app master
+    // Memory ask has to be a multiple of min and less than max.
+    // Dump out information about cluster capability as seen by the resource 
manager
+    int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // set the application name
+    ApplicationSubmissionContext appContext = 
app.getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+    String appName = _applicationSpec.getAppName();
+    appContext.setApplicationName(appName);
+
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = 
Records.newRecord(ContainerLaunchContext.class);
+
+    // set local resources for the application master
+    // local files or archives as needed
+    // In this scenario, the jar file for the application master is part of 
the local resources
+    Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>();
+
+    LOG.info("Copy App archive file from local filesystem and add to local 
environment");
+    // Copy the application master jar to the filesystem
+    // Create a local resource to point to the destination jar path
+    FileSystem fs = FileSystem.get(_conf);
+
+    // copy App Master package
+    Path src = new Path(appMasterArchive.getAbsolutePath());
+    String pathSuffix = appName + "/" + appId.getId() + "/" + "appmaster-pkg" 
+ ".tar";
+    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+    fs.copyFromLocalFile(false, true, src, dst);
+    FileStatus destStatus = fs.getFileStatus(dst);
+    LocalResource amJarRsrc = setupLocalResource(dst, destStatus);
+    localResources.put("appmaster-pkg", amJarRsrc);
+
+    // copy component packages
+    for (String serviceName : _applicationSpec.getServices()) {
+      src = new Path(_applicationSpec.getServicePackage(serviceName));
+      pathSuffix = appName + "/" + appId.getId() + "/" + serviceName + ".tar";
+      dst = new Path(fs.getHomeDirectory(), pathSuffix);
+      fs.copyFromLocalFile(false, true, src, dst);
+      destStatus = fs.getFileStatus(dst);
+      amJarRsrc = setupLocalResource(dst, destStatus);
+      localResources.put(serviceName + "-pkg", amJarRsrc);
+    }
+    // Set local resource info into app master container launch context
+    amContainer.setLocalResources(localResources);
+
+    // Set the necessary security tokens as needed
+    // amContainer.setContainerTokens(containerToken);
+
+    // Add AppMaster.jar location to classpath
+    // At some point we should not be required to add
+    // the hadoop specific classpaths to the env.
+    // It should be provided out of the box.
+    // For now setting all required classpaths including
+    // the classpath to "." for the application jar
+    StringBuilder classPathEnv =
+        new 
StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*");
+    StringBuilder appClassPathEnv = new StringBuilder();
+    // put the jar files under the archive in the classpath
+    try {
+      final InputStream is = new FileInputStream(appMasterArchive);
+      final TarArchiveInputStream debInputStream =
+          (TarArchiveInputStream) new 
ArchiveStreamFactory().createArchiveInputStream("tar", is);
+      TarArchiveEntry entry = null;
+      while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != 
null) {
+        if (entry.isFile()) {
+          appClassPathEnv.append(File.pathSeparatorChar);
+          appClassPathEnv.append("./app-pkg/" + entry.getName());
+        }
+      }
+      debInputStream.close();
+
+    } catch (Exception e) {
+      LOG.error("Unable to read archive file:" + appMasterArchive, e);
+    }
+    classPathEnv.append(appClassPathEnv);
+    for (String c : 
_conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(File.pathSeparatorChar);
+      classPathEnv.append(c.trim());
+    }
+    classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
+
+    // add the runtime classpath needed for tests to work
+    if (_conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      classPathEnv.append(':');
+      classPathEnv.append(System.getProperty("java.class.path"));
+    }
+    System.out.println("classpath" + classPathEnv.toString());
+    // Set the env variables to be setup in the env where the application 
master will be run
+    LOG.info("Set the environment for the application master");
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + 
appId.getId()
+        + "/app-pkg.tar");
+    env.put("appName", appName);
+    env.put("appId", "" + appId.getId());
+    env.put("CLASSPATH", classPathEnv.toString());
+    env.put("appClasspath", appClassPathEnv.toString());
+    env.put("containerParticipantMainClass",
+        "org.apache.helix.provisioning.yarn.ParticipantLauncher");
+    amContainer.setEnvironment(env);
+
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    LOG.info("Setting up app master command");
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+    int amMemory = 1024;
+    // Set Xmx based on am memory size
+    vargs.add("-Xmx" + amMemory + "m");
+    // Set class name
+    vargs.add(HelixYarnApplicationMasterMain.class.getCanonicalName());
+    // Set params for Application Master
+    //vargs.add("--num_containers " + String.valueOf(numContainers));
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/AppMaster.stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + 
"/AppMaster.stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up app master command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+    amContainer.setCommands(commands);
+
+    // Set up resource type requirements
+    // For now, only memory is supported so we set memory requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(amMemory);
+    appContext.setResource(capability);
+
+    // Service data is a binary blob that can be passed to the application
+    // Not needed in this scenario
+    // amContainer.setServiceData(serviceData);
+
+    // Setup security tokens
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials credentials = new Credentials();
+      String tokenRenewer = _conf.get(YarnConfiguration.RM_PRINCIPAL);
+      if (tokenRenewer == null || tokenRenewer.length() == 0) {
+        throw new IOException("Can't get Master Kerberos principal for the RM 
to use as renewer");
+      }
+
+      // For now, only getting tokens for the default file-system.
+      final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, 
credentials);
+      if (tokens != null) {
+        for (Token<?> token : tokens) {
+          LOG.info("Got dt for " + fs.getUri() + "; " + token);
+        }
+      }
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      amContainer.setTokens(fsTokens);
+    }
+
+    appContext.setAMContainerSpec(amContainer);
+
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    int amPriority= 0;
+    // TODO - what is the range for priority? how to decide?
+    pri.setPriority(amPriority);
+    appContext.setPriority(pri);
+
+    String amQueue="";
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue(amQueue);
+
+    // Submit the application to the applications manager
+    // SubmitApplicationResponse submitResp = 
applicationsManager.submitApplication(appRequest);
+    // Ignore the response as either a valid response object is returned on 
success
+    // or an exception thrown to denote some form of a failure
+    LOG.info("Submitting application to ASM");
+
+    yarnClient.submitApplication(appContext);
+
+    return true;
+  }
+
+  private LocalResource setupLocalResource(Path dst, FileStatus destStatus) {
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+    // Set the type of resource - file or archive
+    // archives are untarred at destination
+    // we don't need the jar file to be untarred for now
+    amJarRsrc.setType(LocalResourceType.ARCHIVE);
+    // Set visibility of the resource
+    // Setting to most private option
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    // Set the resource to be copied over
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+    // Set timestamp and length of file so that the framework
+    // can do basic sanity checks for the local resource
+    // after it has been copied over to ensure it is the same
+    // resource the client intended to use with the application
+    amJarRsrc.setTimestamp(destStatus.getModificationTime());
+    amJarRsrc.setSize(destStatus.getLen());
+    return amJarRsrc;
+  }
+
+  /**
+   * @return true if successfully completed, it will print status every X 
seconds
+   */
+  public boolean waitUntilDone() {
+    while (true) {
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * will take the input file and AppSpecFactory class name as input
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    File appMasterArchive = new File (args[0]);
+    ApplicationSpecFactory applicationSpecFactory =
+        (ApplicationSpecFactory) Class.forName(args[1]).newInstance();
+    File yamlConfigFile = new File(args[2]);
+    AppLauncher launcher = new AppLauncher(appMasterArchive, 
applicationSpecFactory, yamlConfigFile);
+    launcher.launch();
+    launcher.waitUntilDone();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
index 16b23fa..6c41542 100644
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
@@ -1,12 +1,26 @@
 package org.apache.helix.provisioning.yarn;
 
+import java.net.URI;
 import java.util.List;
 
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.id.ParticipantId;
+
 public interface ApplicationSpec {
-       public String getAppName();
-       public int getMinContainers();
-       public int getMaxContainers();
-       public AppConfig getConfig();
-       public List<String> getServices();
-       public ServiceConfig getServiceConfig(String name);
+  /**
+   * Returns the name of the application
+   * @return
+   */
+  String getAppName();
+
+  AppConfig getConfig();
+
+  List<String> getServices();
+
+  URI getServicePackage(String serviceName);
+
+  ParticipantConfig getParticipantConfig(String serviceName, ParticipantId 
participantId);
+
+  List<TaskConfig> getTaskConfigs();
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
new file mode 100644
index 0000000..352dc0c
--- /dev/null
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
@@ -0,0 +1,9 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.io.InputStream;
+
+public interface ApplicationSpecFactory {
+  
+  ApplicationSpec fromYaml(InputStream yamlFile);
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
index 6611ec6..500df9c 100644
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
@@ -137,7 +137,7 @@ public class Client {
   private String appMasterArchive = "";
   // Main class to invoke application master
   private final String appMasterMainClass;
-  
+
   private String appSpecFile = "";
 
   // No. of containers in which helix participants will be started
@@ -159,37 +159,6 @@ public class Client {
   private Options opts;
 
   /**
-   * @param args Command line arguments
-   */
-  public static void main(String[] args) {
-    boolean result = false;
-    try {
-      Client client = new Client();
-      LOG.info("Initializing Client");
-      try {
-        boolean doRun = client.init(args);
-        if (!doRun) {
-          System.exit(0);
-        }
-      } catch (IllegalArgumentException e) {
-        System.err.println(e.getLocalizedMessage());
-        client.printUsage();
-        System.exit(-1);
-      }
-      result = client.run();
-    } catch (Throwable t) {
-      LOG.fatal("Error running CLient", t);
-      System.exit(1);
-    }
-    if (result) {
-      LOG.info("Application completed successfully");
-      System.exit(0);
-    }
-    LOG.error("Application failed to complete successfully");
-    System.exit(2);
-  }
-
-  /**
    */
   public Client(Configuration conf) throws Exception {
     this("org.apache.helix.provisioning.yarn.HelixYarnApplicationMasterMain", 
conf);
@@ -270,7 +239,7 @@ public class Client {
     appMasterArchive = cliParser.getOptionValue("archive");
 
     numContainers = 
Integer.parseInt(cliParser.getOptionValue("num_containers", "4"));
-    
+
     log4jPropFile = cliParser.getOptionValue("log_properties", "");
 
     return true;
@@ -371,7 +340,7 @@ public class Client {
     amJarRsrc.setTimestamp(destStatus.getModificationTime());
     amJarRsrc.setSize(destStatus.getLen());
     localResources.put("app-pkg", amJarRsrc);
-    
+
     Path localAppSpec = new Path(appSpecFile);
     pathSuffix = appName + "/" + appId.getId() + "/app-spec.yaml";
     Path dstAppSpec = new Path(fs.getHomeDirectory(), pathSuffix);
@@ -386,7 +355,6 @@ public class Client {
     appSpecResource.setSize(destStatus.getLen());
     localResources.put("app-spec", appSpecResource);
 
-
     // Set the log4j properties if needed
     if (!log4jPropFile.isEmpty()) {
       Path log4jSrc = new Path(log4jPropFile);
@@ -408,7 +376,6 @@ public class Client {
     // Set the necessary security tokens as needed
     // amContainer.setContainerTokens(containerToken);
 
-
     // Add AppMaster.jar location to classpath
     // At some point we should not be required to add
     // the hadoop specific classpaths to the env.
@@ -452,12 +419,14 @@ public class Client {
     // Set the env variables to be setup in the env where the application 
master will be run
     LOG.info("Set the environment for the application master");
     Map<String, String> env = new HashMap<String, String>();
-    env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + 
appId.getId() + "/app-pkg.tar");
+    env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + 
appId.getId()
+        + "/app-pkg.tar");
     env.put("appName", appName);
     env.put("appId", "" + appId.getId());
     env.put("CLASSPATH", classPathEnv.toString());
     env.put("appClasspath", appClassPathEnv.toString());
-    env.put("containerParticipantMainClass", 
"org.apache.helix.provisioning.yarn.ParticipantLauncher");
+    env.put("containerParticipantMainClass",
+        "org.apache.helix.provisioning.yarn.ParticipantLauncher");
     amContainer.setEnvironment(env);
 
     // Set the necessary command to execute the application master
@@ -597,11 +566,13 @@ public class Client {
         return false;
       }
 
-      /*if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
-        LOG.info("Reached client specified timeout for application. Killing 
application");
-        forceKillApplication(appId);
-        return false;
-      }*/
+      /*
+       * if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
+       * LOG.info("Reached client specified timeout for application. Killing 
application");
+       * forceKillApplication(appId);
+       * return false;
+       * }
+       */
     }
 
   }
@@ -619,7 +590,38 @@ public class Client {
 
     // Response can be ignored as it is non-null on success or
     // throws an exception in case of failures
-    //yarnClient.killApplication(appId);
+    // yarnClient.killApplication(appId);
+  }
+
+  /**
+   * @param args Command line arguments
+   */
+  public static void main(String[] args) {
+    boolean result = false;
+    try {
+      Client client = new Client();
+      LOG.info("Initializing Client");
+      try {
+        boolean doRun = client.init(args);
+        if (!doRun) {
+          System.exit(0);
+        }
+      } catch (IllegalArgumentException e) {
+        System.err.println(e.getLocalizedMessage());
+        client.printUsage();
+        System.exit(-1);
+      }
+      result = client.run();
+    } catch (Throwable t) {
+      LOG.fatal("Error running CLient", t);
+      System.exit(1);
+    }
+    if (result) {
+      LOG.info("Application completed successfully");
+      System.exit(0);
+    }
+    LOG.error("Application failed to complete successfully");
+    System.exit(2);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
deleted file mode 100644
index 81a9c56..0000000
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.helix.HelixConnection;
-import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.manager.zk.AbstractParticipantService;
-
-public class ContainerParticipant extends AbstractParticipantService {
-  private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
-
-  public ContainerParticipant(HelixConnection connection, ClusterId clusterId,
-      ParticipantId participantId) {
-    super(connection, clusterId, participantId);
-  }
-
-  @Override
-  public void init() {
-    // register a state model
-  }
-
-  @Override
-  public void onPreJoinCluster() {
-    // do tasks that require a connection
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
index 58e7a4f..8a3f19f 100644
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
@@ -9,9 +9,16 @@ import org.apache.commons.cli.Options;
 import org.apache.helix.HelixConnection;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.AbstractParticipantService;
 import org.apache.helix.manager.zk.ZkHelixConnection;
-
+import org.apache.log4j.Logger;
+/**
+ * 
+ * Main class that invokes the Participant Api
+ */
 public class ParticipantLauncher {
+  private static Logger LOG = Logger.getLogger(ParticipantLauncher.class);
+
   public static void main(String[] args) {
 
     System.out.println("Starting Helix Participant: " + Arrays.toString(args));
@@ -20,6 +27,7 @@ public class ParticipantLauncher {
     opts.addOption("cluster", true, "Cluster name, default app name");
     opts.addOption("participantId", true, "Participant Id");
     opts.addOption("zkAddress", true, "Zookeeper address");
+    opts.addOption("ParticipantClass", true, "ParticipantClass");
     try {
       CommandLine cliParser = new GnuParser().parse(opts, args);
       String zkAddress = cliParser.getOptionValue("zkAddress");
@@ -27,8 +35,13 @@ public class ParticipantLauncher {
       connection.connect();
       ClusterId clusterId = 
ClusterId.from(cliParser.getOptionValue("cluster"));
       ParticipantId participantId = 
ParticipantId.from(cliParser.getOptionValue("participantId"));
-      ContainerParticipant containerParticipant =
-          new ContainerParticipant(connection, clusterId, participantId);
+      String participantClass = cliParser.getOptionValue("ParticipantClass");
+      @SuppressWarnings("unchecked")
+      Class<? extends AbstractParticipantService> clazz =
+          (Class<? extends AbstractParticipantService>) 
Class.forName(participantClass);
+      AbstractParticipantService containerParticipant =
+          clazz.getConstructor(HelixConnection.class, ClusterId.class, 
ParticipantId.class)
+              .newInstance(connection, clusterId, participantId);
       containerParticipant.startAsync();
       containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
       Thread.currentThread().join();

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java
new file mode 100644
index 0000000..0b500a9
--- /dev/null
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/TaskConfig.java
@@ -0,0 +1,13 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class TaskConfig {
+       public Map<String, String> config = new HashMap<String, String>();
+       
+       public String getValue(String key) {
+               return (config != null ? config.get(key) : null);
+       }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
index d5ac2c0..5a17755 100644
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
@@ -1,9 +1,12 @@
 package org.apache.helix.provisioning.yarn;
 
 import java.io.InputStream;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.id.ParticipantId;
 import org.yaml.snakeyaml.Yaml;
 
 class DefaultApplicationSpec implements ApplicationSpec {
@@ -22,16 +25,6 @@ class DefaultApplicationSpec implements ApplicationSpec {
        }
 
        @Override
-       public int getMinContainers() {
-               return minContainers;
-       }
-
-       @Override
-       public int getMaxContainers() {
-               return maxContainers;
-       }
-
-       @Override
        public AppConfig getConfig() {
                return appConfig;
        }
@@ -41,10 +34,20 @@ class DefaultApplicationSpec implements ApplicationSpec {
                return services;
        }
 
-       @Override
-       public ServiceConfig getServiceConfig(String name) {
-               return serviceConfigMap.get(name);
-       }
+  @Override
+  public URI getServicePackage(String serviceName) {
+    return null;
+  }
+
+  @Override
+  public ParticipantConfig getParticipantConfig(String serviceName, 
ParticipantId participantId) {
+    return null;
+  }
+
+  @Override
+  public List<TaskConfig> getTaskConfigs() {
+    return null;
+  }
 }
 
 public class YamlApplicationSpecFactory {

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
----------------------------------------------------------------------
diff --git 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
index 055bfd7..db9e524 100644
--- 
a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
+++ 
b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
@@ -4,10 +4,10 @@ import org.apache.helix.HelixConnection;
 import org.apache.helix.api.id.ClusterId;
 import org.apache.helix.api.id.ParticipantId;
 import org.apache.helix.api.id.StateModelDefId;
-import org.apache.helix.provisioning.yarn.ContainerParticipant;
+import org.apache.helix.manager.zk.AbstractParticipantService;
 
 
-public class HelloWorldService extends ContainerParticipant {
+public class HelloWorldService extends AbstractParticipantService {
 
        public HelloWorldService(HelixConnection connection, ClusterId 
clusterId,
                        ParticipantId participantId) {
@@ -19,5 +19,10 @@ public class HelloWorldService extends ContainerParticipant {
                HelloWorldStateModelFactory stateModelFactory = new 
HelloWorldStateModelFactory();
                
getParticipant().getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("OnlineOffline"),
 stateModelFactory);
        }
+
+  @Override
+  public void onPreJoinCluster() {
+    //this will be invoked prior to 
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/15f080cb/helix-provisioning/src/main/resources/sample_application.yaml
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/resources/sample_application.yaml 
b/helix-provisioning/src/main/resources/sample_application.yaml
new file mode 100644
index 0000000..f45faa3
--- /dev/null
+++ b/helix-provisioning/src/main/resources/sample_application.yaml
@@ -0,0 +1,26 @@
+===
+appName: test
+configs:
+    k1: v1
+services:
+  - name: myservice
+    participantClass: org.apache.helix.myApp.SimpleWebserver 
+    minContainers:3
+    maxContainers:3              
+    configs:
+        - participantId: myservice_0         
+          port: 9500
+        - participantId: myservice_1        
+          port: 9501 
+        - participantId: myservice_2         
+          port: 9502
+resources:
+  - name: distributedLock
+    numPartitions: 6
+    numReplicas: 2
+    rebalanceMode: FULL_AUTO
+    stateModel: OnlineOffline
+    tag: myservice 
+    configs: 
+        k1: v1
+                     
\ No newline at end of file

Reply via email to