Updated Branches: refs/heads/helix-provisioning f282a3003 -> c9ddde3e7
Participants now connect to Helix cluster Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c9ddde3e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c9ddde3e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c9ddde3e Branch: refs/heads/helix-provisioning Commit: c9ddde3e719d73a3158a050ebdb4a38cb6c91c68 Parents: f282a30 Author: Kishore Gopalakrishna <[email protected]> Authored: Sat Jan 11 23:34:07 2014 -0800 Committer: Kishore Gopalakrishna <[email protected]> Committed: Sat Jan 11 23:34:07 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/helix/api/Participant.java | 1 + .../provisioner/ContainerProvider.java | 4 +- .../provisioner/ParticipantContainer.java | 12 ----- .../provisioner/ParticipantService.java | 10 +++++ .../stages/ContainerProvisioningStage.java | 2 +- .../integration/TestLocalContainerProvider.java | 10 ++--- helix-provisioning/pom.xml | 6 --- .../provisioning/yarn/ApplicationSpec.java | 19 -------- .../apache/helix/provisioning/yarn/Client.java | 30 +++---------- .../provisioning/yarn/ContainerParticipant.java | 36 ++++++++++++--- .../yarn/HelixYarnApplicationMasterMain.java | 11 +++-- .../provisioning/yarn/ParticipantLauncher.java | 47 ++++++++++++++++++++ .../provisioning/yarn/RMCallbackHandler.java | 5 ++- .../provisioning/yarn/YarnProvisioner.java | 30 +++++++------ .../yarn/YarnProvisionerConfig.java | 1 - 15 files changed, 130 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/api/Participant.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java index fc20968..3ed395b 100644 --- a/helix-core/src/main/java/org/apache/helix/api/Participant.java +++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java @@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableMap; */ public class Participant { private final ParticipantConfig _config; + private final ContainerConfig _containerConfig; private final RunningInstance _runningInstance; http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java index c88733f..a95abe0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java @@ -1,5 +1,7 @@ package org.apache.helix.controller.provisioner; +import org.apache.helix.api.Participant; + import com.google.common.util.concurrent.ListenableFuture; /* @@ -27,7 +29,7 @@ public interface ContainerProvider { ListenableFuture<Boolean> deallocateContainer(ContainerId containerId); - ListenableFuture<Boolean> startContainer(ContainerId containerId); + ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant); ListenableFuture<Boolean> stopContainer(ContainerId containerId); http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java deleted file mode 100644 index 7b39aca..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java +++ /dev/null @@ -1,12 +0,0 @@ -package org.apache.helix.controller.provisioner; - -public class ParticipantContainer { - - /** - * Id request by the target provider - */ - String requestId; - - String allocatedId; - -} http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java new file mode 100644 index 0000000..92b5a24 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java @@ -0,0 +1,10 @@ +package org.apache.helix.controller.provisioner; + +public interface ParticipantService { + + boolean init(ServiceConfig serviceConfig); + + boolean start(); + + boolean stop(); +} http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index 97b80b9..499f904 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -154,7 +154,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // create the helix participant and add it to cluster - ListenableFuture<Boolean> future = provisioner.startContainer(containerId); + ListenableFuture<Boolean> future = provisioner.startContainer(containerId, participant); Futures.addCallback(future, new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java index 7b8a580..2f1d397 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java @@ -242,12 +242,12 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { } @Override - public ListenableFuture<Boolean> startContainer(ContainerId containerId) { - ParticipantService participant = + public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant ) { + ParticipantService participantService = new ParticipantService(_clusterId, _containerParticipants.get(containerId)); - participant.startAsync(); - participant.awaitRunning(); - _participants.put(containerId, participant); + participantService.startAsync(); + participantService.awaitRunning(); + _participants.put(containerId, participantService); _states.put(containerId, ContainerState.ACTIVE); started++; SettableFuture<Boolean> future = SettableFuture.create(); http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/pom.xml ---------------------------------------------------------------------- diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml index d83bbf2..254d420 100644 --- a/helix-provisioning/pom.xml +++ b/helix-provisioning/pom.xml @@ -57,12 +57,6 @@ under the License. <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - </dependency> - <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java deleted file mode 100644 index 6671364..0000000 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.apache.helix.provisioning.yarn; - -public class ApplicationSpec { - - int minContainers; - - int maxContainers; - - String serviceClass; - - String targetProvider; - - String stateModel; - - String taskClass; - - int numTasks; - -} http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java index 3caf8f0..a7a119f 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java @@ -131,7 +131,7 @@ public class Client { // Queue for App master private String amQueue = ""; // Amt. of memory resource to request for to run the App Master - private int amMemory = 10; + private int amMemory = 1024; // Application master jar file private String appMasterArchive = ""; @@ -201,19 +201,13 @@ public class Client { yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); opts = new Options(); - opts.addOption("appname", true, "Application Name. Default value - DistributedShell"); + opts.addOption("appName", true, "Application Name."); opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("queue", true, "RM Queue in which this application is to be submitted"); opts.addOption("timeout", true, "Application timeout in milliseconds"); opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master"); opts.addOption("archive", true, "Jar file containing the application master"); - opts.addOption("shell_command", true, "Shell command to be executed by the Application Master"); - opts.addOption("shell_script", true, "Location of the shell script to be executed"); - opts.addOption("shell_args", true, "Command line args for the shell script"); - opts.addOption("shell_env", true, - "Environment for shell script. Specified as env_key=env_val pairs"); - opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers"); opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command"); opts.addOption("num_containers", true, @@ -258,13 +252,12 @@ public class Client { if (cliParser.hasOption("debug")) { debugFlag = true; - } - appName = cliParser.getOptionValue("appname", "DistributedShell"); + appName = cliParser.getOptionValue("appName"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); - amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10")); + amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "1024")); if (amMemory < 0) { throw new IllegalArgumentException( @@ -278,17 +271,6 @@ public class Client { appMasterArchive = cliParser.getOptionValue("archive"); - containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10")); - numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1")); - - if (containerMemory < 0 || numContainers < 1) { - throw new IllegalArgumentException( - "Invalid no. of containers or container memory specified, exiting." - + " Specified containerMemory=" + containerMemory + ", numContainer=" + numContainers); - } - - clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000")); - log4jPropFile = cliParser.getOptionValue("log_properties", ""); return true; @@ -460,7 +442,7 @@ public class Client { env.put("appId", "" + appId.getId()); env.put("CLASSPATH", classPathEnv.toString()); env.put("appClasspath", appClassPathEnv.toString()); - env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ContainerParticipant"); + env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ParticipantLauncher"); amContainer.setEnvironment(env); // Set the necessary command to execute the application master @@ -623,7 +605,7 @@ public class Client { // Response can be ignored as it is non-null on success or // throws an exception in case of failures - yarnClient.killApplication(appId); + //yarnClient.killApplication(appId); } } http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java index b0272e8..9c80d87 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java @@ -1,15 +1,39 @@ package org.apache.helix.provisioning.yarn; -import java.util.Arrays; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.helix.HelixConnection; +import org.apache.helix.HelixParticipant; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ParticipantId; + +import com.google.common.util.concurrent.AbstractService; -public class ContainerParticipant { +public class ContainerParticipant extends AbstractService { private static final Log LOG = LogFactory.getLog(ContainerParticipant.class); + private final ClusterId _clusterId; + private final ParticipantId _participantId; + private HelixParticipant _participant; + private HelixConnection _connection; + + public ContainerParticipant(HelixConnection connection, ClusterId clusterId, + ParticipantId participantId) { + _connection = connection; + _clusterId = clusterId; + _participantId = participantId; + } + + @Override + protected void doStart() { + _participant = _connection.createParticipant(_clusterId, _participantId); + // register statemachine + _participant.startAsync(); + notifyStarted(); + } - public static void main(String[] args) throws InterruptedException { - LOG.info("Starting participant: "+ Arrays.toString(args)); - Thread.currentThread().join(); + @Override + protected void doStop() { + _participant.stopAsync(); + notifyStopped(); } } http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java index 92930ed..8be4754 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java @@ -1,10 +1,12 @@ package org.apache.helix.provisioning.yarn; +import java.io.File; import java.util.Map; import org.I0Itec.zkclient.IDefaultNameSpace; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkServer; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -41,9 +43,12 @@ public class HelixYarnApplicationMasterMain { @Override public void createDefaultNameSpace(ZkClient zkClient) { - + } }; + FileUtils.deleteDirectory(new File(dataDir)); + FileUtils.deleteDirectory(new File(logDir)); + final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace); server.start(); @@ -62,7 +67,7 @@ public class HelixYarnApplicationMasterMain { YarnProvisioner.applicationMaster = genericApplicationMaster; String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181"; - String clusterName = "testCluster"; + String clusterName = envs.get("appName"); String resourceName = "testResource"; int NUM_PARTITIONS = 6; int NUM_REPLICAS = 2; @@ -91,7 +96,7 @@ public class HelixYarnApplicationMasterMain { // start controller ControllerId controllerId = ControllerId.from("controller1"); HelixController controller = connection.createController(clusterId, controllerId); - controller.startAsync(); // TODO: is this really async? + controller.startAsync(); Thread shutdownhook = new Thread(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java new file mode 100644 index 0000000..58e7a4f --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java @@ -0,0 +1,47 @@ +package org.apache.helix.provisioning.yarn; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Options; +import org.apache.helix.HelixConnection; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.manager.zk.ZkHelixConnection; + +public class ParticipantLauncher { + public static void main(String[] args) { + + System.out.println("Starting Helix Participant: " + Arrays.toString(args)); + Options opts; + opts = new Options(); + opts.addOption("cluster", true, "Cluster name, default app name"); + opts.addOption("participantId", true, "Participant Id"); + opts.addOption("zkAddress", true, "Zookeeper address"); + try { + CommandLine cliParser = new GnuParser().parse(opts, args); + String zkAddress = cliParser.getOptionValue("zkAddress"); + HelixConnection connection = new ZkHelixConnection(zkAddress); + connection.connect(); + ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster")); + ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId")); + ContainerParticipant containerParticipant = + new ContainerParticipant(connection, clusterId, participantId); + containerParticipant.startAsync(); + containerParticipant.awaitRunning(60, TimeUnit.SECONDS); + Thread.currentThread().join(); + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Failed to start Helix participant" + e); + // System.exit(1); + } + try { + Thread.currentThread().join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java index 6c87bd2..50c38b5 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java @@ -16,7 +16,7 @@ import com.google.common.util.concurrent.SettableFuture; class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { private static final Log LOG = LogFactory.getLog(RMCallbackHandler.class); - + long startTime; /** * */ @@ -27,6 +27,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { */ RMCallbackHandler(GenericApplicationMaster genericApplicationMaster) { _genericApplicationMaster = genericApplicationMaster; + startTime = System.currentTimeMillis(); } @SuppressWarnings("unchecked") @@ -96,7 +97,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler { @Override public float getProgress() { // set progress to deliver to RM on next heartbeat - return 0.5f; + return (System.currentTimeMillis()-startTime) % Integer.MAX_VALUE; } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index f74e312..1a903d4 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -65,7 +65,8 @@ public class YarnProvisioner implements Provisioner { static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors .newCachedThreadPool()); Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>(); - int DEFAULT_CONTAINER = 4; + int DEFAULT_CONTAINER = 1; + private HelixManager _helixManager; @Override public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) { @@ -99,11 +100,11 @@ public class YarnProvisioner implements Provisioner { } @Override - public ListenableFuture<Boolean> startContainer(final ContainerId containerId) { + public ListenableFuture<Boolean> startContainer(final ContainerId containerId, Participant participant) { Container container = allocatedContainersMap.get(containerId); ContainerLaunchContext launchContext; try { - launchContext = createLaunchContext(containerId); + launchContext = createLaunchContext(containerId, container, participant); } catch (Exception e) { LOG.error("Exception while creating context to launch container:" + containerId, e); return null; @@ -118,9 +119,9 @@ public class YarnProvisioner implements Provisioner { }, service); } - private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception { + private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container, Participant participant) throws Exception { - ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + ContainerLaunchContext participantContainer = Records.newRecord(ContainerLaunchContext.class); Map<String, String> envs = System.getenv(); String appName = envs.get("appName"); @@ -158,7 +159,7 @@ public class YarnProvisioner implements Provisioner { localResources.put("app-pkg", amJarRsrc); // Set local resource info into app master container launch context - amContainer.setLocalResources(localResources); + participantContainer.setLocalResources(localResources); // Set the necessary security tokens as needed // amContainer.setContainerTokens(containerToken); @@ -166,7 +167,7 @@ public class YarnProvisioner implements Provisioner { // Set the env variables to be setup in the env where the application master will be run LOG.info("Set the environment for the application master"); Map<String, String> env = new HashMap<String, String>(); - env.put("app-pkg-path", dst.getName()); + env.put("app_pkg_path", dst.getName()); // Add AppMaster.jar location to classpath // At some point we should not be required to add // the hadoop specific classpaths to the env. @@ -190,10 +191,9 @@ public class YarnProvisioner implements Provisioner { classPathEnv.append(':'); classPathEnv.append(System.getProperty("java.class.path")); } - System.out.println("classoath" + classPathEnv.toString()); env.put("CLASSPATH", classPathEnv.toString()); - amContainer.setEnvironment(env); + participantContainer.setEnvironment(env); // Set the necessary command to execute the application master Vector<CharSequence> vargs = new Vector<CharSequence>(30); @@ -206,8 +206,9 @@ public class YarnProvisioner implements Provisioner { // Set class name vargs.add(containerParticipantMainClass); // Set params for container participant - vargs.add("--zk_address " + zkAddress); - vargs.add("--participantId " + containerId.stringify()); + vargs.add("--zkAddress " + zkAddress); + vargs.add("--cluster " + appName); + vargs.add("--participantId " + participant.getId().stringify()); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr"); @@ -218,11 +219,11 @@ public class YarnProvisioner implements Provisioner { command.append(str).append(" "); } - LOG.info("Completed setting up app master command " + command.toString()); + LOG.info("Completed setting up container launch command " + command.toString() + " with arguments \n" + vargs); List<String> commands = new ArrayList<String>(); commands.add(command.toString()); - amContainer.setCommands(commands); - return amContainer; + participantContainer.setCommands(commands); + return participantContainer; } @Override @@ -244,6 +245,7 @@ public class YarnProvisioner implements Provisioner { @Override public void init(HelixManager helixManager) { + _helixManager = helixManager; } http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java index 8427c14..0c1dbda 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java @@ -5,7 +5,6 @@ import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.provisioner.ProvisionerRef; import org.apache.helix.controller.serializer.DefaultStringSerializer; import org.apache.helix.controller.serializer.StringSerializer; -import org.apache.helix.integration.TestLocalContainerProvider.LocalProvisioner; import org.codehaus.jackson.annotate.JsonProperty; public class YarnProvisionerConfig implements ProvisionerConfig {
