Repository: helix Updated Branches: refs/heads/helix-provisioning 57b4b180e -> 8b19cfc77
Almost complete working example of Helloworld Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8b19cfc7 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8b19cfc7 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8b19cfc7 Branch: refs/heads/helix-provisioning Commit: 8b19cfc77b0ddd6bc90dcb034cfbd9b983ff2932 Parents: 57b4b18 Author: Kishore Gopalakrishna <[email protected]> Authored: Thu Feb 20 22:08:18 2014 -0800 Committer: Kishore Gopalakrishna <[email protected]> Committed: Thu Feb 20 22:08:18 2014 -0800 ---------------------------------------------------------------------- .../controller/provisioner/ContainerSpec.java | 19 ++++-- .../stages/ContainerProvisioningStage.java | 23 ++++--- .../manager/zk/AbstractParticipantService.java | 68 ++++++++++++++----- .../integration/TestLocalContainerProvider.java | 4 +- .../provisioning/yarn/ApplicationSpec.java | 4 +- .../yarn/HelixYarnApplicationMasterMain.java | 40 ++++++----- .../helix/provisioning/yarn/ServiceConfig.java | 14 ++-- .../yarn/YamlApplicationSpecFactory.java | 70 -------------------- .../provisioning/yarn/YarnProvisioner.java | 53 ++++++--------- .../yarn/example/HelloWorldService.java | 40 +++++++---- .../yarn/example/HelloworldAppSpec.java | 23 +++---- .../main/resources/hello_world_app_spec.yaml | 3 +- 12 files changed, 177 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java index 4d3a521..ab3c46a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java @@ -1,5 +1,7 @@ package org.apache.helix.controller.provisioner; +import org.apache.helix.api.id.ParticipantId; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -27,8 +29,10 @@ public class ContainerSpec { int _memory; - public ContainerSpec(ContainerId containerId) { - this._containerId = containerId; + private ParticipantId _participantId; + + public ContainerSpec(ParticipantId _participantId) { + this._participantId = _participantId; } public ContainerId getContainerId() { @@ -37,7 +41,7 @@ public class ContainerSpec { @Override public String toString() { - return _containerId.toString(); + return _participantId.toString(); } public void setMemory(int memory){ @@ -49,6 +53,13 @@ public class ContainerSpec { } public static ContainerSpec from(String serialized) { - return new ContainerSpec(ContainerId.from(serialized)); + //todo + return null; + //return new ContainerSpec(ContainerId.from(serialized)); } + + public ParticipantId getParticipantId() { + return _participantId; + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index 42c8218..f7105d1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -21,6 +21,7 @@ package org.apache.helix.controller.stages; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.helix.HelixAdmin; @@ -121,16 +122,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage { // allocate new containers for (final ContainerSpec spec : response.getContainersToAcquire()) { - // random participant id - final ParticipantId participantId = ParticipantId.from(spec.getContainerId().stringify()); - // create a new Participant, attach the container spec - InstanceConfig instanceConfig = new InstanceConfig(participantId); - instanceConfig.setContainerSpec(spec); - // create a helix_participant in ACQUIRING state - instanceConfig.setContainerState(ContainerState.ACQUIRING); - // create the helix participant and add it to cluster - helixAdmin.addInstance(cluster.getId().toString(), instanceConfig); - + final ParticipantId participantId = spec.getParticipantId(); + List<String> instancesInCluster = helixAdmin.getInstancesInCluster(cluster.getId().stringify()); + if (!instancesInCluster.contains(participantId.stringify())) { + // create a new Participant, attach the container spec + InstanceConfig instanceConfig = new InstanceConfig(participantId); + instanceConfig.setContainerSpec(spec); + // create a helix_participant in ACQUIRING state + instanceConfig.setContainerState(ContainerState.ACQUIRING); + // create the helix participant and add it to cluster + helixAdmin.addInstance(cluster.getId().toString(), instanceConfig); + } ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec); FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() { @Override @@ -160,7 +162,6 @@ public class ContainerProvisioningStage extends AbstractBaseStage { helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId() .toString()); final ContainerId containerId = existingInstance.getContainerId(); - existingInstance.setContainerId(containerId); existingInstance.setContainerState(ContainerState.CONNECTING); accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java index 2e5eafa..f515092 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java @@ -35,6 +35,7 @@ public abstract class AbstractParticipantService extends AbstractService { private final ParticipantId _participantId; private HelixParticipant _participant; private HelixConnection _connection; + boolean initialized; /** * Initialize the service. @@ -50,20 +51,22 @@ public abstract class AbstractParticipantService extends AbstractService { } @Override - protected void doStart() { + protected final void doStart() { _participant = _connection.createParticipant(_clusterId, _participantId); // add a preconnect callback _participant.addPreConnectCallback(new PreConnectCallback() { @Override public void onPreConnect() { - onPreJoinCluster(); + if (initialized) { + onReconnect(); + } else { + init(); + initialized = true; + } } }); - // register state machine and other initialization - init(); - // start and notify if (!_connection.isConnected()) { _connection.connect(); @@ -73,34 +76,67 @@ public abstract class AbstractParticipantService extends AbstractService { } @Override - protected void doStop() { + protected final void doStop() { _participant.stop(); notifyStopped(); } /** - * Initialize the participant. For example, here is where you can register a state machine: <br/> + * Invoked when connection is re-established to zookeeper. Typical scenario this is invoked is + * when there is a long GC pause that causes the node to disconnect from the cluster and + * reconnects. NOTE: When the service disconnects all its states are reset to initial state. + */ + protected void onReconnect() { + // default implementation does nothing. + } + + /** + * Initialize the participant. For example, here is where you can + * <ul> + * <li>Read configuration of the cluster,resource, node</li> + * <li>Read configuration of the cluster,resource, node register a state machine: <br/> * <br/> * <code> * HelixParticipant participant = getParticipant(); * participant.getStateMachineEngine().registerStateModelFactory(stateModelDefId, factory); * </code><br/> * <br/> - * This code is called prior to starting the participant. + * </li> + * </ul> + * This code is called after connecting to zookeeper but before creating the liveinstance. */ - public abstract void init(); - - /** - * Complete any tasks that require a live Helix connection. This function is called before the - * participant declares itself ready to receive state transitions. - */ - public abstract void onPreJoinCluster(); + protected abstract void init(); /** * Get an instantiated participant instance. * @return HelixParticipant */ - public HelixParticipant getParticipant() { + protected HelixParticipant getParticipant() { return _participant; } + + /** + * @return ClusterId + * @see {@link ClusterId} + */ + public ClusterId getClusterId() { + return _clusterId; + } + + /** + * @see {@link ParticipantId} + * @return + */ + public ParticipantId getParticipantId() { + return _participantId; + } + + /** + * @see {@link HelixConnection} + * @return HelixConnection + */ + public HelixConnection getConnection() { + return _connection; + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java index 0f7be64..f4153cc 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java @@ -277,8 +277,8 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { List<ContainerSpec> containersToAcquire = Lists.newArrayList(); boolean asked = false; if (_askCount < MAX_PARTICIPANTS) { - containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + _askCount))); - containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + (_askCount + 1)))); + containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + _askCount))); + containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + (_askCount + 1)))); asked = true; } List<Participant> containersToStart = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java index e104578..285d036 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java @@ -3,8 +3,6 @@ package org.apache.helix.provisioning.yarn; import java.net.URI; import java.util.List; -import org.apache.helix.api.config.ParticipantConfig; -import org.apache.helix.api.id.ParticipantId; public interface ApplicationSpec { /** @@ -23,7 +21,7 @@ public interface ApplicationSpec { String getServiceMainClass(String service); - ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId); + ServiceConfig getServiceConfig(String serviceName); List<TaskConfig> getTaskConfigs(); http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java index 058b384..33183c7 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java @@ -46,7 +46,7 @@ public class HelixYarnApplicationMasterMain { public static void main(String[] args) throws Exception { Map<String, String> env = System.getenv(); LOG.info("Starting app master with the following environment variables"); - for(String key: env.keySet()){ + for (String key : env.keySet()) { LOG.info(key + "\t\t=" + env.get(key)); } int numContainers = 1; @@ -93,11 +93,11 @@ public class HelixYarnApplicationMasterMain { YarnProvisioner.applicationMaster = genericApplicationMaster; YarnProvisioner.applicationMasterConfig = appMasterConfig; - YarnProvisioner.applicationSpec = factory.fromYaml(new FileInputStream(configFile)); + ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile)); + YarnProvisioner.applicationSpec = applicationSpec; String zkAddress = appMasterConfig.getZKAddress(); String clusterName = appMasterConfig.getAppName(); - - String resourceName = "HelloWorld"; + // CREATE CLUSTER and setup the resources // connect ZkHelixConnection connection = new ZkHelixConnection(zkAddress); @@ -110,17 +110,27 @@ public class HelixYarnApplicationMasterMain { new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService()); clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition( statelessService).build()); - - // add the resource with the local provisioner - ResourceId resourceId = ResourceId.from(resourceName); - YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId); - provisionerConfig.setNumContainers(numContainers); - RebalancerConfig rebalancerConfig = - new FullAutoRebalancerConfig.Builder(resourceId).stateModelDefId( - statelessService.getStateModelDefId()).build(); - clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(ResourceId.from(resourceName)) - .provisionerConfig(provisionerConfig).rebalancerConfig(rebalancerConfig).build()); - + for (String service : applicationSpec.getServices()) { + String resourceName = service; + // add the resource with the local provisioner + ResourceId resourceId = ResourceId.from(resourceName); + YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId); + ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName); + provisionerConfig.setNumContainers(serviceConfig.getIntField("num_containers", 1)); + serviceConfig.setSimpleField("service_name", service); + FullAutoRebalancerConfig.Builder rebalancerConfigBuilder = + new FullAutoRebalancerConfig.Builder(resourceId); + RebalancerConfig rebalancerConfig = + rebalancerConfigBuilder.stateModelDefId(statelessService.getStateModelDefId())// + .build(); + ResourceConfig.Builder resourceConfigBuilder = + new ResourceConfig.Builder(ResourceId.from(resourceName)); + ResourceConfig resourceConfig = resourceConfigBuilder.provisionerConfig(provisionerConfig) // + .rebalancerConfig(rebalancerConfig) // + .userConfig(serviceConfig) // + .build(); + clusterAccessor.addResourceToCluster(resourceConfig); + } // start controller ControllerId controllerId = ControllerId.from("controller1"); HelixController controller = connection.createController(clusterId, controllerId); http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java index 4d9173e..87b5f12 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java @@ -3,11 +3,15 @@ package org.apache.helix.provisioning.yarn; import java.util.HashMap; import java.util.Map; -public class ServiceConfig { +import org.apache.helix.api.Scope; +import org.apache.helix.api.config.UserConfig; +import org.apache.helix.api.id.ResourceId; + +public class ServiceConfig extends UserConfig{ public Map<String, String> config = new HashMap<String, String>(); - public String getValue(String key) { - return (config != null ? config.get(key) : null); - } - + public ServiceConfig(Scope<ResourceId> scope) { + super(scope); + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java deleted file mode 100644 index e87a5c2..0000000 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.apache.helix.provisioning.yarn; - -import java.io.InputStream; -import java.net.URI; -import java.util.List; -import java.util.Map; - -import org.apache.helix.api.config.ParticipantConfig; -import org.apache.helix.api.id.ParticipantId; -import org.yaml.snakeyaml.Yaml; - -class DefaultApplicationSpec implements ApplicationSpec { - public String appName; - public Integer minContainers; - public Integer maxContainers; - - public AppConfig appConfig; - - public List<String> services; - public Map<String, ServiceConfig> serviceConfigMap; - - @Override - public String getAppName() { - return appName; - } - - @Override - public AppConfig getConfig() { - return appConfig; - } - - @Override - public List<String> getServices() { - return services; - } - - @Override - public URI getServicePackage(String serviceName) { - return null; - } - - @Override - public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId) { - return null; - } - - @Override - public List<TaskConfig> getTaskConfigs() { - return null; - } - - @Override - public URI getAppMasterPackage() { - // TODO Auto-generated method stub - return null; - } - - @Override - public String getServiceMainClass(String service) { - // TODO Auto-generated method stub - return null; - } -} - -public class YamlApplicationSpecFactory { - ApplicationSpec fromYaml(InputStream input) { - Yaml yaml = new Yaml(); - return yaml.loadAs(input, DefaultApplicationSpec.class); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index daac87b..8fd308e 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -210,8 +210,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr vargs.add("--cluster " + appName); vargs.add("--participantId " + participant.getId().stringify()); vargs.add("--participantClass " + mainClass); - ; - + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr"); @@ -262,13 +261,13 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr int targetNumContainers = provisionerConfig.getNumContainers(); // Any container that is in a state should be put in this set - Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>(); + Set<ParticipantId> existingContainersIdSet = new HashSet<ParticipantId>(); // Cache halted containers to determine which to restart and which to release - Map<ContainerId, Participant> excessHaltedContainers = Maps.newHashMap(); + Map<ParticipantId, Participant> excessHaltedContainers = Maps.newHashMap(); // Cache participants to ensure that excess participants are stopped - Map<ContainerId, Participant> excessActiveContainers = Maps.newHashMap(); + Map<ParticipantId, Participant> excessActiveContainers = Maps.newHashMap(); for (Participant participant : participants) { ContainerConfig containerConfig = participant.getContainerConfig(); @@ -276,35 +275,35 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr ContainerState state = containerConfig.getState(); switch (state) { case ACQUIRING: - existingContainersIdSet.add(containerConfig.getId()); + existingContainersIdSet.add(participant.getId()); break; case ACQUIRED: // acquired containers are ready to start - existingContainersIdSet.add(containerConfig.getId()); + existingContainersIdSet.add(participant.getId()); containersToStart.add(participant); break; case CONNECTING: - existingContainersIdSet.add(containerConfig.getId()); + existingContainersIdSet.add(participant.getId()); break; case CONNECTED: // active containers can be stopped or kept active - existingContainersIdSet.add(containerConfig.getId()); - excessActiveContainers.put(containerConfig.getId(), participant); + existingContainersIdSet.add(participant.getId()); + excessActiveContainers.put(participant.getId(), participant); break; case DISCONNECTED: // disconnected containers must be stopped - existingContainersIdSet.add(containerConfig.getId()); + existingContainersIdSet.add(participant.getId()); containersToStop.add(participant); case HALTING: - existingContainersIdSet.add(containerConfig.getId()); + existingContainersIdSet.add(participant.getId()); break; case HALTED: // halted containers can be released or restarted - existingContainersIdSet.add(containerConfig.getId()); - excessHaltedContainers.put(containerConfig.getId(), participant); + existingContainersIdSet.add(participant.getId()); + excessHaltedContainers.put(participant.getId(), participant); break; case FINALIZING: - existingContainersIdSet.add(containerConfig.getId()); + existingContainersIdSet.add(participant.getId()); break; case FINALIZED: break; @@ -316,29 +315,21 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr default: break; } - ContainerId containerId = containerConfig.getId(); - if (containerId != null) { - // _containerParticipants.put(containerId, participant.getId()); - // _states.put(containerId, state); - } } } for (int i = 0; i < targetNumContainers; i++) { - ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i)); - excessActiveContainers.remove(containerId); // don't stop this container if active - if (excessHaltedContainers.containsKey(containerId)) { + ParticipantId participantId = ParticipantId.from(resourceId + "_container_" + (i)); + excessActiveContainers.remove(participantId); // don't stop this container if active + if (excessHaltedContainers.containsKey(participantId)) { // Halted containers can be restarted if necessary - Participant participant = excessHaltedContainers.get(containerId); + Participant participant = excessHaltedContainers.get(participantId); containersToStart.add(participant); - excessHaltedContainers.remove(containerId); // don't release this container - } else if (!existingContainersIdSet.contains(containerId)) { + excessHaltedContainers.remove(participantId); // don't release this container + } else if (!existingContainersIdSet.contains(participantId)) { // Unallocated containers must be allocated - ContainerSpec containerSpec = new ContainerSpec(containerId); - ParticipantId participantId = ParticipantId.from(containerId.stringify()); - ParticipantConfig participantConfig = - applicationSpec.getParticipantConfig(resourceId.stringify(), participantId); - containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024)); + ContainerSpec containerSpec = new ContainerSpec(participantId); + containerSpec.setMemory(_resourceConfig.getUserConfig().getIntField("memory", 1024)); containersToAcquire.add(containerSpec); } } http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java index 614be36..f65fd5d 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java @@ -1,28 +1,40 @@ package org.apache.helix.provisioning.yarn.example; import org.apache.helix.HelixConnection; +import org.apache.helix.api.accessor.ResourceAccessor; +import org.apache.helix.api.config.UserConfig; import org.apache.helix.api.id.ClusterId; import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.ResourceId; import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.manager.zk.AbstractParticipantService; - +import org.apache.log4j.Logger; public class HelloWorldService extends AbstractParticipantService { - public HelloWorldService(HelixConnection connection, ClusterId clusterId, - ParticipantId participantId) { - super(connection, clusterId, participantId); - } - - @Override - public void init() { - HelloWorldStateModelFactory stateModelFactory = new HelloWorldStateModelFactory(); - getParticipant().getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("StatelessService"), stateModelFactory); - } + private static Logger LOG = Logger.getLogger(AbstractParticipantService.class); + + static String SERVICE_NAME = "HelloWorld"; + + public HelloWorldService(HelixConnection connection, ClusterId clusterId, + ParticipantId participantId) { + super(connection, clusterId, participantId); + } + /** + * init method to setup appropriate call back handlers. + */ @Override - public void onPreJoinCluster() { - //this will be invoked prior to + public void init() { + ClusterId clusterId = getClusterId(); + ResourceAccessor resourceAccessor = getConnection().createResourceAccessor(clusterId); + UserConfig serviceConfig = resourceAccessor.readUserConfig(ResourceId.from(SERVICE_NAME)); + LOG.info("Starting service:" + SERVICE_NAME + " with configuration:" + serviceConfig); + + HelloWorldStateModelFactory stateModelFactory = new HelloWorldStateModelFactory(); + getParticipant().getStateMachineEngine().registerStateModelFactory( + StateModelDefId.from("StatelessService"), stateModelFactory); + } -} +} http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java index 2e4cd75..e22c7b2 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java @@ -7,27 +7,31 @@ import java.util.Map; import org.apache.helix.api.Scope; import org.apache.helix.api.config.ParticipantConfig; +import org.apache.helix.api.config.ResourceConfig; +import org.apache.helix.api.config.ResourceConfig.Builder; import org.apache.helix.api.config.UserConfig; import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.ResourceId; import org.apache.helix.provisioning.yarn.AppConfig; import org.apache.helix.provisioning.yarn.ApplicationSpec; +import org.apache.helix.provisioning.yarn.ServiceConfig; import org.apache.helix.provisioning.yarn.TaskConfig; public class HelloworldAppSpec implements ApplicationSpec { - private String _appName; + public String _appName; - private AppConfig _appConfig; + public AppConfig _appConfig; - private List<String> _services; + public List<String> _services; private String _appMasterPackageUri; - + private Map<String, String> _servicePackageURIMap; private Map<String, String> _serviceMainClassMap; - private Map<String,Map<String,String>> _serviceConfigMap; + private Map<String, Map<String, String>> _serviceConfigMap; private List<TaskConfig> _taskConfigs; @@ -122,13 +126,8 @@ public class HelloworldAppSpec implements ApplicationSpec { } @Override - public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId) { - ParticipantConfig.Builder builder = new ParticipantConfig.Builder(participantId); - Scope<ParticipantId> scope = Scope.participant(participantId); - UserConfig userConfig = new UserConfig(scope); - Map<String, String> map = _serviceConfigMap.get(serviceName); - userConfig.setSimpleFields(map); - return builder.addTag(serviceName).userConfig(userConfig ).build(); + public ServiceConfig getServiceConfig(String serviceName) { + return new ServiceConfig(Scope.resource(ResourceId.from(serviceName))); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/resources/hello_world_app_spec.yaml ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/resources/hello_world_app_spec.yaml b/helix-provisioning/src/main/resources/hello_world_app_spec.yaml index 648104a..1d4f1b7 100644 --- a/helix-provisioning/src/main/resources/hello_world_app_spec.yaml +++ b/helix-provisioning/src/main/resources/hello_world_app_spec.yaml @@ -7,7 +7,8 @@ appMasterPackageUri: 'file:///Users/kgopalak/Documents/projects/incubator-helix/ appName: testApp serviceConfigMap: HelloWorld: { - k1: v1 + num_containers: 3, + memory: 1024 } serviceMainClassMap: { HelloWorld: org.apache.helix.provisioning.yarn.example.HelloWorldService
