Updated Branches: refs/heads/helix-provisioning c9ddde3e7 -> 9386a4cbc
Adding command line support to launch the cluster Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9386a4cb Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9386a4cb Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9386a4cb Branch: refs/heads/helix-provisioning Commit: 9386a4cbc3d8b5cd8e26ba3970fd77e259924351 Parents: c9ddde3 Author: Kishore Gopalakrishna <[email protected]> Authored: Sun Jan 12 21:01:09 2014 -0800 Committer: Kishore Gopalakrishna <[email protected]> Committed: Sun Jan 12 21:01:09 2014 -0800 ---------------------------------------------------------------------- helix-provisioning/pom.xml | 8 +++--- .../apache/helix/provisioning/yarn/Client.java | 22 ++++++--------- .../yarn/HelixYarnApplicationMasterMain.java | 29 ++++++++++++++++---- .../provisioning/yarn/YarnProvisioner.java | 9 +++--- .../yarn/YarnProvisionerConfig.java | 9 ++++++ 5 files changed, 51 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/pom.xml ---------------------------------------------------------------------- diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml index 254d420..e04b3ef 100644 --- a/helix-provisioning/pom.xml +++ b/helix-provisioning/pom.xml @@ -48,13 +48,13 @@ under the License. <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> - <scope>provided</scope> + <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> <version>${hadoop.version}</version> - <scope>provided</scope> + <!-- <scope>provided</scope> --> </dependency> <dependency> <groupId>org.testng</groupId> @@ -82,8 +82,8 @@ under the License. <configuration> <programs> <program> - <mainClass>org.apache.helix.provisioning.HelixAgentMain</mainClass> - <name>start-helix-provisioning</name> + <mainClass>org.apache.helix.provisioning.yarn.Client</mainClass> + <name>yarn-job-launcher</name> </program> </programs> </configuration> http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java index a7a119f..15b43a6 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java @@ -138,9 +138,7 @@ public class Client { // Main class to invoke application master private final String appMasterMainClass; - // Amt of memory to request for container in which shell script will be executed - private int containerMemory = 10; - // No. of containers in which the shell script needs to be executed + // No. of containers in which helix participants will be started private int numContainers = 1; // log4j.properties file @@ -204,14 +202,11 @@ public class Client { opts.addOption("appName", true, "Application Name."); opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); - opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); - opts.addOption("archive", true, "Jar file containing the application master"); - opts.addOption("container_memory", true, - "Amount of memory in MB to be requested to run the shell command"); + opts.addOption("archive", true, "Archive file containing the app code"); opts.addOption("num_containers", true, - "No. of containers on which the shell command needs to be executed"); + "No. of containers on which Helix Participants will be launched"); opts.addOption("log_properties", true, "log4j.properties file"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -271,6 +266,8 @@ public class Client { appMasterArchive = cliParser.getOptionValue("archive"); + numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "4")); + log4jPropFile = cliParser.getOptionValue("log_properties", ""); return true; @@ -456,7 +453,6 @@ public class Client { // Set class name vargs.add(appMasterMainClass); // Set params for Application Master - vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--num_containers " + String.valueOf(numContainers)); if (debugFlag) { @@ -548,9 +544,9 @@ public class Client { while (true) { - // Check app status every 1 second. + // Check app status every 10 second. try { - Thread.sleep(1000); + Thread.sleep(10000); } catch (InterruptedException e) { LOG.debug("Thread sleep in monitoring loop interrupted"); } @@ -583,11 +579,11 @@ public class Client { return false; } - if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { + /*if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) { LOG.info("Reached client specified timeout for application. Killing application"); forceKillApplication(appId); return false; - } + }*/ } } http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java index 8be4754..2356e91 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java @@ -1,11 +1,15 @@ package org.apache.helix.provisioning.yarn; import java.io.File; +import java.util.Arrays; import java.util.Map; import org.I0Itec.zkclient.IDefaultNameSpace; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkServer; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -24,6 +28,7 @@ import org.apache.helix.controller.rebalancer.config.RebalancerConfig; import org.apache.helix.manager.zk.ZkHelixConnection; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.StateModelConfigGenerator; +import org.apache.log4j.Logger; /** * This will <br/> @@ -35,7 +40,21 @@ import org.apache.helix.tools.StateModelConfigGenerator; * </ul> */ public class HelixYarnApplicationMasterMain { + public static Logger LOG = Logger.getLogger(HelixYarnApplicationMasterMain.class); + public static void main(String[] args) throws Exception { + int numContainers = 1; + + Options opts; + opts = new Options(); + opts.addOption("num_containers", true, "Number of containers"); + try { + CommandLine cliParser = new GnuParser().parse(opts, args); + numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers")); + } catch (Exception e) { + LOG.error("Error parsing input arguments" + Arrays.toString(args), e); + } + // START ZOOKEEPER String dataDir = "dataDir"; String logDir = "logDir"; @@ -43,18 +62,17 @@ public class HelixYarnApplicationMasterMain { @Override public void createDefaultNameSpace(ZkClient zkClient) { - + } }; FileUtils.deleteDirectory(new File(dataDir)); FileUtils.deleteDirectory(new File(logDir)); - + final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace); server.start(); // start Map<String, String> envs = System.getenv(); - ContainerId containerId = ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name())); ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId(); @@ -86,7 +104,8 @@ public class HelixYarnApplicationMasterMain { // add the resource with the local provisioner ResourceId resourceId = ResourceId.from(resourceName); - ProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId); + YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId); + provisionerConfig.setNumContainers(numContainers); RebalancerConfig rebalancerConfig = new FullAutoRebalancerConfig.Builder(resourceId).addPartitions(NUM_PARTITIONS) .replicaCount(NUM_REPLICAS).stateModelDefId(masterSlave.getStateModelDefId()).build(); @@ -96,7 +115,7 @@ public class HelixYarnApplicationMasterMain { // start controller ControllerId controllerId = ControllerId.from("controller1"); HelixController controller = connection.createController(clusterId, controllerId); - controller.startAsync(); + controller.startAsync(); Thread shutdownhook = new Thread(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index 1a903d4..1d0c078 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -49,6 +49,7 @@ import org.apache.helix.controller.provisioner.ContainerId; import org.apache.helix.controller.provisioner.ContainerSpec; import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.Provisioner; +import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.provisioner.TargetProviderResponse; import com.google.common.collect.Lists; @@ -65,7 +66,6 @@ public class YarnProvisioner implements Provisioner { static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors .newCachedThreadPool()); Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>(); - int DEFAULT_CONTAINER = 1; private HelixManager _helixManager; @Override @@ -258,10 +258,11 @@ public class YarnProvisioner implements Provisioner { List<Participant> containersToStart = Lists.newArrayList(); List<Participant> containersToRelease = Lists.newArrayList(); List<Participant> containersToStop = Lists.newArrayList(); - - for (int i = 0; i < DEFAULT_CONTAINER - participants.size(); i++) { + YarnProvisionerConfig provisionerConfig = (YarnProvisionerConfig) cluster.getConfig().getResourceMap().get(resourceId).getProvisionerConfig(); + int targetNumContainers = provisionerConfig.getNumContainers(); + for (int i = 0; i < targetNumContainers - participants.size(); i++) { containersToAcquire.add(new ContainerSpec(ContainerId.from("container" - + (DEFAULT_CONTAINER - i)))); + + (targetNumContainers - i)))); } response.setContainersToAcquire(containersToAcquire); http://git-wip-us.apache.org/repos/asf/helix/blob/9386a4cb/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java index 0c1dbda..67dd679 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java @@ -12,6 +12,7 @@ public class YarnProvisionerConfig implements ProvisionerConfig { private ResourceId _resourceId; private Class<? extends StringSerializer> _serializerClass; private ProvisionerRef _provisionerRef; + private Integer _numContainers; public YarnProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) { _resourceId = resourceId; @@ -19,6 +20,14 @@ public class YarnProvisionerConfig implements ProvisionerConfig { _provisionerRef = ProvisionerRef.from(YarnProvisioner.class.getName()); } + public void setNumContainers(int numContainers) { + _numContainers = numContainers; + } + + public Integer getNumContainers() { + return _numContainers; + } + @Override public ResourceId getResourceId() { return _resourceId;
