Repository: helix Updated Branches: refs/heads/helix-provisioning 15f080cb5 -> 7387834e6
Helloworld example Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7387834e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7387834e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7387834e Branch: refs/heads/helix-provisioning Commit: 7387834e60aa508fecec09fe8b412f18d7f3fdc4 Parents: 15f080c Author: Kishore Gopalakrishna <[email protected]> Authored: Mon Feb 17 13:08:24 2014 -0800 Committer: Kishore Gopalakrishna <[email protected]> Committed: Mon Feb 17 13:08:24 2014 -0800 ---------------------------------------------------------------------- .../controller/provisioner/Provisioner.java | 20 ++- .../controller/provisioner/TargetProvider.java | 2 +- .../stages/ContainerProvisioningStage.java | 22 ++- .../helix/tools/StateModelConfigGenerator.java | 54 +++++++ .../integration/TestLocalContainerProvider.java | 16 +- .../helix/provisioning/yarn/AppLauncher.java | 157 ++++++++++++------- .../provisioning/yarn/AppMasterConfig.java | 77 +++++++++ .../provisioning/yarn/ApplicationSpec.java | 4 + .../yarn/HelixYarnApplicationMasterMain.java | 32 ++-- .../yarn/YamlApplicationSpecFactory.java | 12 ++ .../provisioning/yarn/YarnProvisioner.java | 44 ++++-- .../yarn/example/HelloWordAppSpecFactory.java | 32 ++++ .../yarn/example/HelloworldAppSpec.java | 86 ++++++++++ .../main/resources/hello_world_app_spec.yaml | 0 14 files changed, 459 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-core/src/main/java/org/apache/helix/controller/provisioner/Provisioner.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/Provisioner.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/Provisioner.java index 0a7a175..f9e6341 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/Provisioner.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/Provisioner.java @@ -1,5 +1,8 @@ package org.apache.helix.controller.provisioner; +import org.apache.helix.HelixManager; +import org.apache.helix.api.config.ResourceConfig; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +22,21 @@ package org.apache.helix.controller.provisioner; * under the License. */ -public interface Provisioner extends ContainerProvider, TargetProvider { +public interface Provisioner { + /** + * @param helixManager + * @param resourceConfig + */ + void init(HelixManager helixManager, ResourceConfig resourceConfig); + + /** + * @return + */ + ContainerProvider getContainerProvider(); + + /** + * @return + */ + TargetProvider getTargetProvider(); } http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java index 4204da4..063d008 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/TargetProvider.java @@ -24,11 +24,11 @@ import java.util.Collection; import org.apache.helix.HelixManager; import org.apache.helix.api.Cluster; import org.apache.helix.api.Participant; +import org.apache.helix.api.config.ResourceConfig; import org.apache.helix.api.id.ResourceId; public interface TargetProvider { - public void init(HelixManager helixManager); /** * @param cluster http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index fdd6d4e..2f97c5a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -35,11 +35,13 @@ import org.apache.helix.api.id.ParticipantId; import org.apache.helix.api.id.ResourceId; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.provisioner.ContainerId; +import org.apache.helix.controller.provisioner.ContainerProvider; import org.apache.helix.controller.provisioner.ContainerSpec; import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.Provisioner; import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.provisioner.ProvisionerRef; +import org.apache.helix.controller.provisioner.TargetProvider; import org.apache.helix.controller.provisioner.TargetProviderResponse; import org.apache.helix.model.InstanceConfig; import org.apache.log4j.Logger; @@ -62,6 +64,9 @@ public class ContainerProvisioningStage extends AbstractBaseStage { private static final Logger LOG = Logger.getLogger(ContainerProvisioningStage.class); Map<ResourceId, Provisioner> _provisionerMap = new HashMap<ResourceId, Provisioner>(); + Map<ResourceId, TargetProvider> _targetProviderMap = new HashMap<ResourceId, TargetProvider>(); + Map<ResourceId, ContainerProvider> _containerProviderMap = + new HashMap<ResourceId, ContainerProvider>(); @Override public void process(ClusterEvent event) throws Exception { @@ -85,14 +90,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage { provisioner = provisionerRef.getProvisioner(); } if (provisioner != null) { - provisioner.init(helixManager); + provisioner.init(helixManager, resourceConfig); + _containerProviderMap.put(resourceId, provisioner.getContainerProvider()); + _targetProviderMap.put(resourceId, provisioner.getTargetProvider()); _provisionerMap.put(resourceId, provisioner); } else { LOG.error("Resource " + resourceId + " does not have a valid provisioner class!"); break; } } - + TargetProvider targetProvider = _targetProviderMap.get(resourceId); + ContainerProvider containerProvider = _containerProviderMap.get(resourceId); final Cluster cluster = event.getAttribute("ClusterDataCache"); final Collection<Participant> participants = cluster.getParticipantMap().values(); @@ -110,7 +118,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { // TargetProvider should be stateless, given the state of cluster and existing participants // it should return the same result final TargetProviderResponse response = - provisioner.evaluateExistingContainers(cluster, resourceId, participants); + targetProvider.evaluateExistingContainers(cluster, resourceId, participants); // allocate new containers for (final ContainerSpec spec : response.getContainersToAcquire()) { @@ -124,7 +132,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { // create the helix participant and add it to cluster helixAdmin.addInstance(cluster.getId().toString(), instanceConfig); - ListenableFuture<ContainerId> future = provisioner.allocateContainer(spec); + ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec); FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() { @Override public void onSuccess(ContainerId containerId) { @@ -158,7 +166,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // create the helix participant and add it to cluster - ListenableFuture<Boolean> future = provisioner.startContainer(containerId, participant); + ListenableFuture<Boolean> future = containerProvider.startContainer(containerId, participant); FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { @@ -188,7 +196,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // remove the participant - ListenableFuture<Boolean> future = provisioner.deallocateContainer(containerId); + ListenableFuture<Boolean> future = containerProvider.deallocateContainer(containerId); FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { @@ -221,7 +229,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // stop the container - ListenableFuture<Boolean> future = provisioner.stopContainer(containerId); + ListenableFuture<Boolean> future = containerProvider.stopContainer(containerId); FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java index e970f6f..3eed4fb 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java +++ b/helix-core/src/main/java/org/apache/helix/tools/StateModelConfigGenerator.java @@ -105,6 +105,60 @@ public class StateModelConfigGenerator { return record; } + public static ZNRecord generateConfigForStatelessService() { + ZNRecord record = new ZNRecord("StatelessService"); + record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "OFFLINE"); + List<String> statePriorityList = new ArrayList<String>(); + statePriorityList.add("ONLINE"); + statePriorityList.add("OFFLINE"); + statePriorityList.add("DROPPED"); + statePriorityList.add("ERROR"); + record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(), + statePriorityList); + for (String state : statePriorityList) { + String key = state + ".meta"; + Map<String, String> metadata = new HashMap<String, String>(); + if (state.equals("ONLINE")) { + metadata.put("count", "N"); + record.setMapField(key, metadata); + } else if (state.equals("OFFLINE")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("DROPPED")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("ERROR")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } + } + for (String state : statePriorityList) { + String key = state + ".next"; + if (state.equals("ONLINE")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("OFFLINE", "OFFLINE"); + metadata.put("DROPPED", "OFFLINE"); + record.setMapField(key, metadata); + } + if (state.equals("OFFLINE")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("ONLINE", "ONLINE"); + metadata.put("DROPPED", "DROPPED"); + record.setMapField(key, metadata); + } + if (state.equals("ERROR")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("OFFLINE", "OFFLINE"); + record.setMapField(key, metadata); + } + } + List<String> stateTransitionPriorityList = new ArrayList<String>(); + stateTransitionPriorityList.add("ONLINE-OFFLINE"); + stateTransitionPriorityList.add("OFFLINE-ONLINE"); + record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(), + stateTransitionPriorityList); + return record; + } public static ZNRecord generateConfigForMasterSlave() { ZNRecord record = new ZNRecord("MasterSlave"); record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "OFFLINE"); http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java index 7df03f3..8eb5f56 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java @@ -42,11 +42,13 @@ import org.apache.helix.api.id.ParticipantId; import org.apache.helix.api.id.ResourceId; import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.controller.provisioner.ContainerId; +import org.apache.helix.controller.provisioner.ContainerProvider; import org.apache.helix.controller.provisioner.ContainerSpec; import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.Provisioner; import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.provisioner.ProvisionerRef; +import org.apache.helix.controller.provisioner.TargetProvider; import org.apache.helix.controller.provisioner.TargetProviderResponse; import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig; import org.apache.helix.controller.rebalancer.config.RebalancerConfig; @@ -201,7 +203,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { /** * Provisioner that will start and stop participants locally */ - public static class LocalProvisioner implements Provisioner { + public static class LocalProvisioner implements Provisioner, TargetProvider, ContainerProvider { private HelixManager _helixManager; private ClusterId _clusterId; private int _askCount; @@ -210,7 +212,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { private Map<ContainerId, ParticipantService> _participants; @Override - public void init(HelixManager helixManager) { + public void init(HelixManager helixManager, ResourceConfig resourceConfig) { // TODO: would be nice to have a HelixConnection instead of a HelixManager _helixManager = helixManager; _clusterId = ClusterId.from(_helixManager.getClusterName()); @@ -329,5 +331,15 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { response.setContainersToRelease(containersToRelease); return response; } + + @Override + public ContainerProvider getContainerProvider() { + return this; + } + + @Override + public TargetProvider getTargetProvider() { + return this; + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java index b90bf8e..c744062 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java @@ -5,6 +5,7 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -47,8 +48,11 @@ import org.apache.hadoop.yarn.util.Records; * Converts yaml file into ApplicationSpec. */ public class AppLauncher { + private static final Log LOG = LogFactory.getLog(Client.class); + + private ApplicationSpec _applicationSpec; private YarnClient yarnClient; private ApplicationSpecFactory _applicationSpecFactory; @@ -58,6 +62,10 @@ public class AppLauncher { private File appMasterArchive; + private ApplicationId _appId; + + private AppMasterConfig _appMasterConfig; + public AppLauncher(File appMasterArchive, ApplicationSpecFactory applicationSpecFactory, File yamlConfigFile) throws Exception { _applicationSpecFactory = applicationSpecFactory; @@ -67,6 +75,7 @@ public class AppLauncher { private void init() throws Exception { _applicationSpec = _applicationSpecFactory.fromYaml(new FileInputStream(_yamlConfigFile)); + _appMasterConfig = new AppMasterConfig(); yarnClient = YarnClient.createYarnClient(); _conf = new YarnConfiguration(); yarnClient.init(_conf); @@ -89,45 +98,51 @@ public class AppLauncher { // set the application name ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); - ApplicationId appId = appContext.getApplicationId(); + _appId = appContext.getApplicationId(); + _appMasterConfig.setAppId(_appId.getId()); String appName = _applicationSpec.getAppName(); + _appMasterConfig.setAppName(appName); appContext.setApplicationName(appName); // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - // set local resources for the application master - // local files or archives as needed - // In this scenario, the jar file for the application master is part of the local resources - Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + LOG.info("Copy App archive file from local filesystem and add to local environment"); // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path FileSystem fs = FileSystem.get(_conf); - // copy App Master package - Path src = new Path(appMasterArchive.getAbsolutePath()); - String pathSuffix = appName + "/" + appId.getId() + "/" + "appmaster-pkg" + ".tar"; - Path dst = new Path(fs.getHomeDirectory(), pathSuffix); - fs.copyFromLocalFile(false, true, src, dst); - FileStatus destStatus = fs.getFileStatus(dst); - LocalResource amJarRsrc = setupLocalResource(dst, destStatus); - localResources.put("appmaster-pkg", amJarRsrc); - - // copy component packages + // get packages for each component packages + Map<String, URI> packages = new HashMap<String, URI>(); + packages.put(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString(), appMasterArchive.toURI()); + packages.put(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(), _yamlConfigFile.toURI()); for (String serviceName : _applicationSpec.getServices()) { - src = new Path(_applicationSpec.getServicePackage(serviceName)); - pathSuffix = appName + "/" + appId.getId() + "/" + serviceName + ".tar"; - dst = new Path(fs.getHomeDirectory(), pathSuffix); - fs.copyFromLocalFile(false, true, src, dst); - destStatus = fs.getFileStatus(dst); - amJarRsrc = setupLocalResource(dst, destStatus); - localResources.put(serviceName + "-pkg", amJarRsrc); + packages.put(serviceName, _applicationSpec.getServicePackage(serviceName)); + } + Map<String, Path> hdfsDest = new HashMap<String, Path>(); + Map<String, String> classpathMap = new HashMap<String, String>(); + for (String name : packages.keySet()) { + URI uri = packages.get(name); + Path dst = copyToHDFS(fs, name, uri); + hdfsDest.put(name, dst); + String classpath = generateClasspathAfterExtraction(name, new File(uri)); + classpathMap.put(name, classpath); + _appMasterConfig.setClasspath(name, classpath); } + // set local resources for the application master + // local files or archives as needed + // In this scenario, the jar file for the application master is part of the local resources + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + setupLocalResource(fs, hdfsDest.get(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString())); + setupLocalResource(fs, hdfsDest.get(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString())); + // Set local resource info into app master container launch context amContainer.setLocalResources(localResources); + + // Set the necessary security tokens as needed // amContainer.setContainerTokens(containerToken); @@ -139,25 +154,7 @@ public class AppLauncher { // the classpath to "." for the application jar StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*"); - StringBuilder appClassPathEnv = new StringBuilder(); - // put the jar files under the archive in the classpath - try { - final InputStream is = new FileInputStream(appMasterArchive); - final TarArchiveInputStream debInputStream = - (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is); - TarArchiveEntry entry = null; - while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) { - if (entry.isFile()) { - appClassPathEnv.append(File.pathSeparatorChar); - appClassPathEnv.append("./app-pkg/" + entry.getName()); - } - } - debInputStream.close(); - - } catch (Exception e) { - LOG.error("Unable to read archive file:" + appMasterArchive, e); - } - classPathEnv.append(appClassPathEnv); + classPathEnv.append(classpathMap.get(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString())); for (String c : _conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { classPathEnv.append(File.pathSeparatorChar); @@ -173,16 +170,7 @@ public class AppLauncher { System.out.println("classpath" + classPathEnv.toString()); // Set the env variables to be setup in the env where the application master will be run LOG.info("Set the environment for the application master"); - Map<String, String> env = new HashMap<String, String>(); - env.put("app_pkg_path", fs.getHomeDirectory() + "/" + appName + "/" + appId.getId() - + "/app-pkg.tar"); - env.put("appName", appName); - env.put("appId", "" + appId.getId()); - env.put("CLASSPATH", classPathEnv.toString()); - env.put("appClasspath", appClassPathEnv.toString()); - env.put("containerParticipantMainClass", - "org.apache.helix.provisioning.yarn.ParticipantLauncher"); - amContainer.setEnvironment(env); + amContainer.setEnvironment(_appMasterConfig.getEnv()); // Set the necessary command to execute the application master Vector<CharSequence> vargs = new Vector<CharSequence>(30); @@ -196,7 +184,7 @@ public class AppLauncher { // Set class name vargs.add(HelixYarnApplicationMasterMain.class.getCanonicalName()); // Set params for Application Master - //vargs.add("--num_containers " + String.valueOf(numContainers)); + // vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); @@ -247,12 +235,12 @@ public class AppLauncher { // Set the priority for the application master Priority pri = Records.newRecord(Priority.class); - int amPriority= 0; + int amPriority = 0; // TODO - what is the range for priority? how to decide? pri.setPriority(amPriority); appContext.setPriority(pri); - String amQueue=""; + String amQueue = ""; // Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue); @@ -267,12 +255,59 @@ public class AppLauncher { return true; } - private LocalResource setupLocalResource(Path dst, FileStatus destStatus) { + /** + * Generates the classpath after the archive file gets extracted under 'serviceName' folder + * @param serviceName + * @param archiveFile + * @return + */ + private String generateClasspathAfterExtraction(String serviceName, File archiveFile) { + if (!isArchive(archiveFile.getAbsolutePath())) { + return "./"; + } + StringBuilder classpath = new StringBuilder(); + // put the jar files under the archive in the classpath + try { + final InputStream is = new FileInputStream(archiveFile); + final TarArchiveInputStream debInputStream = + (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is); + TarArchiveEntry entry = null; + while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) { + if (entry.isFile()) { + classpath.append(File.pathSeparatorChar); + classpath.append("./" + serviceName + "/" + entry.getName()); + } + } + debInputStream.close(); + + } catch (Exception e) { + LOG.error("Unable to read archive file:" + archiveFile, e); + } + return classpath.toString(); + } + + private Path copyToHDFS(FileSystem fs, String name, URI uri) throws Exception { + // will throw exception if the file name is without extension + String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1); + String pathSuffix = + _applicationSpec.getAppName() + "/" + _appId.getId() + "/" + name + "." + extension; + Path dst = new Path(fs.getHomeDirectory(), pathSuffix); + Path src = new Path(uri); + fs.copyFromLocalFile(false, true, src, dst); + return dst; + } + + private LocalResource setupLocalResource(FileSystem fs, Path dst) throws Exception { + URI uri = dst.toUri(); + String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1); + FileStatus destStatus = fs.getFileStatus(dst); LocalResource amJarRsrc = Records.newRecord(LocalResource.class); // Set the type of resource - file or archive // archives are untarred at destination // we don't need the jar file to be untarred for now - amJarRsrc.setType(LocalResourceType.ARCHIVE); + if (isArchive(extension)) { + amJarRsrc.setType(LocalResourceType.ARCHIVE); + } // Set visibility of the resource // Setting to most private option amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); @@ -287,6 +322,11 @@ public class AppLauncher { return amJarRsrc; } + private boolean isArchive(String path) { + return path.endsWith("tar") || path.endsWith("gz") || path.endsWith("tar.gz") + || path.endsWith("zip"); + } + /** * @return true if successfully completed, it will print status every X seconds */ @@ -307,11 +347,12 @@ public class AppLauncher { * @throws Exception */ public static void main(String[] args) throws Exception { - File appMasterArchive = new File (args[0]); + File appMasterArchive = new File(args[0]); ApplicationSpecFactory applicationSpecFactory = (ApplicationSpecFactory) Class.forName(args[1]).newInstance(); File yamlConfigFile = new File(args[2]); - AppLauncher launcher = new AppLauncher(appMasterArchive, applicationSpecFactory, yamlConfigFile); + AppLauncher launcher = + new AppLauncher(appMasterArchive, applicationSpecFactory, yamlConfigFile); launcher.launch(); launcher.waitUntilDone(); http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java new file mode 100644 index 0000000..2b8836e --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java @@ -0,0 +1,77 @@ +package org.apache.helix.provisioning.yarn; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; + +/** + * Convenient method to pass information to containers + * The methods simply sets up environment variables + */ +public class AppMasterConfig { + + private Map<String, String> _envs; + + public enum AppEnvironment { + APP_MASTER_PKG, + APP_SPEC_FILE, + APP_NAME, + APP_ID; + } + + public AppMasterConfig() { + _envs = new HashMap<String, String>(System.getenv()); + } + + public void setAppId(int id) { + _envs.put(AppEnvironment.APP_ID.toString(), "" + id); + } + + public String getAppName() { + return _envs.get(AppEnvironment.APP_NAME.toString()); + } + + public int getAppId() { + return Integer.parseInt(_envs.get(AppEnvironment.APP_ID.toString())); + } + + public String getClassPath(String serviceName) { + return _envs.get(serviceName + ".classPath"); + } + + public String getMainClass(String serviceName) { + return _envs.get(serviceName + ".mainClass"); + } + + public String getZKAddress() { + return _envs.get(Environment.NM_HOST.name()) + ":2181"; + } + + public String getContainerId() { + return _envs.get(Environment.CONTAINER_ID.name()); + } + + public Map<String, String> getEnv() { + return _envs; + } + + public void setAppName(String appName) { + _envs.put(AppEnvironment.APP_NAME.toString(), appName); + + } + + public void setClasspath(String serviceName, String classpath) { + _envs.put(serviceName + ".classpath", classpath); + } + + public String getApplicationSpecConfigFile() { + // TODO Auto-generated method stub + return null; + } + + public String getApplicationSpecProvider() { + // TODO Auto-generated method stub + return null; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java index 6c41542..e104578 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java @@ -17,7 +17,11 @@ public interface ApplicationSpec { List<String> getServices(); + URI getAppMasterPackage(); + URI getServicePackage(String serviceName); + + String getServiceMainClass(String service); ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId); http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java index c84b627..604b228 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java @@ -1,6 +1,7 @@ package org.apache.helix.provisioning.yarn; import java.io.File; +import java.io.FileInputStream; import java.util.Arrays; import java.util.Map; @@ -72,23 +73,26 @@ public class HelixYarnApplicationMasterMain { server.start(); // start - Map<String, String> envs = System.getenv(); - ContainerId containerId = - ConverterUtils.toContainerId(envs.get(Environment.CONTAINER_ID.name())); + AppMasterConfig appMasterConfig = new AppMasterConfig(); + String containerIdStr = appMasterConfig.getContainerId(); + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId(); - // GenericApplicationMaster genAppMaster = new GenericApplicationMaster(appAttemptID); + String configFile = AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(); + ApplicationSpecFactory factory = + (ApplicationSpecFactory) Class.forName(appMasterConfig.getApplicationSpecProvider()) + .newInstance(); GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID); genericApplicationMaster.start(); YarnProvisioner.applicationMaster = genericApplicationMaster; + YarnProvisioner.applicationMasterConfig = appMasterConfig; + YarnProvisioner.applicationSpec = factory.fromYaml(new FileInputStream(configFile)); + String zkAddress = appMasterConfig.getZKAddress(); + String clusterName = appMasterConfig.getAppName(); - String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181"; - String clusterName = envs.get("appName"); - String resourceName = "testResource"; - int NUM_PARTITIONS = 6; - int NUM_REPLICAS = 2; + String resourceName = "HelloWorld"; // CREATE CLUSTER and setup the resources // connect ZkHelixConnection connection = new ZkHelixConnection(zkAddress); @@ -97,18 +101,18 @@ public class HelixYarnApplicationMasterMain { // create the cluster ClusterId clusterId = ClusterId.from(clusterName); ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId); - StateModelDefinition masterSlave = - new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); + StateModelDefinition statelessService = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService()); clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition( - masterSlave).build()); + statelessService).build()); // add the resource with the local provisioner ResourceId resourceId = ResourceId.from(resourceName); YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId); provisionerConfig.setNumContainers(numContainers); RebalancerConfig rebalancerConfig = - new FullAutoRebalancerConfig.Builder(resourceId).addPartitions(NUM_PARTITIONS) - .replicaCount(NUM_REPLICAS).stateModelDefId(masterSlave.getStateModelDefId()).build(); + new FullAutoRebalancerConfig.Builder(resourceId).stateModelDefId( + statelessService.getStateModelDefId()).build(); clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(ResourceId.from(resourceName)) .provisionerConfig(provisionerConfig).rebalancerConfig(rebalancerConfig).build()); http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java index 5a17755..e87a5c2 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java @@ -48,6 +48,18 @@ class DefaultApplicationSpec implements ApplicationSpec { public List<TaskConfig> getTaskConfigs() { return null; } + + @Override + public URI getAppMasterPackage() { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getServiceMainClass(String service) { + // TODO Auto-generated method stub + return null; + } } public class YamlApplicationSpecFactory { http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index 1d0c078..bf4d832 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -44,12 +44,15 @@ import org.apache.helix.HelixManager; import org.apache.helix.api.Cluster; import org.apache.helix.api.Participant; import org.apache.helix.api.config.ContainerConfig; +import org.apache.helix.api.config.ResourceConfig; import org.apache.helix.api.id.ResourceId; import org.apache.helix.controller.provisioner.ContainerId; +import org.apache.helix.controller.provisioner.ContainerProvider; import org.apache.helix.controller.provisioner.ContainerSpec; import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.Provisioner; import org.apache.helix.controller.provisioner.ProvisionerConfig; +import org.apache.helix.controller.provisioner.TargetProvider; import org.apache.helix.controller.provisioner.TargetProviderResponse; import com.google.common.collect.Lists; @@ -59,14 +62,17 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -public class YarnProvisioner implements Provisioner { +public class YarnProvisioner implements Provisioner, TargetProvider, ContainerProvider { private static final Log LOG = LogFactory.getLog(YarnProvisioner.class); static GenericApplicationMaster applicationMaster; static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors .newCachedThreadPool()); + public static AppMasterConfig applicationMasterConfig; + public static ApplicationSpec applicationSpec; Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>(); private HelixManager _helixManager; + private ResourceConfig _resourceConfig; @Override public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) { @@ -123,12 +129,13 @@ public class YarnProvisioner implements Provisioner { ContainerLaunchContext participantContainer = Records.newRecord(ContainerLaunchContext.class); - Map<String, String> envs = System.getenv(); - String appName = envs.get("appName"); - String appId = envs.get("appId"); - String appClasspath = envs.get("appClasspath"); - String containerParticipantMainClass = envs.get("containerParticipantMainClass"); - String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181"; +// Map<String, String> envs = System.getenv(); + String appName = applicationMasterConfig.getAppName(); + int appId = applicationMasterConfig.getAppId(); + String serviceName = _resourceConfig.getId().stringify(); + String classpath = applicationMasterConfig.getClassPath(serviceName); + String mainClass = applicationMasterConfig.getMainClass(serviceName); + String zkAddress = applicationMasterConfig.getZKAddress(); // set the localresources needed to launch container Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); @@ -177,7 +184,7 @@ public class YarnProvisioner implements Provisioner { StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*"); classPathEnv.append(File.pathSeparatorChar); - classPathEnv.append(appClasspath); + classPathEnv.append(classpath); for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { @@ -186,11 +193,6 @@ public class YarnProvisioner implements Provisioner { } classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties"); - // add the runtime classpath needed for tests to work - if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { - classPathEnv.append(':'); - classPathEnv.append(System.getProperty("java.class.path")); - } env.put("CLASSPATH", classPathEnv.toString()); participantContainer.setEnvironment(env); @@ -204,7 +206,7 @@ public class YarnProvisioner implements Provisioner { // Set Xmx based on am memory size vargs.add("-Xmx" + 1024 + "m"); // Set class name - vargs.add(containerParticipantMainClass); + vargs.add(mainClass); // Set params for container participant vargs.add("--zkAddress " + zkAddress); vargs.add("--cluster " + appName); @@ -244,9 +246,9 @@ public class YarnProvisioner implements Provisioner { } @Override - public void init(HelixManager helixManager) { + public void init(HelixManager helixManager, ResourceConfig resourceConfig) { _helixManager = helixManager; - + _resourceConfig = resourceConfig; } @Override @@ -332,4 +334,14 @@ public class YarnProvisioner implements Provisioner { return request; } + @Override + public ContainerProvider getContainerProvider() { + return this; + } + + @Override + public TargetProvider getTargetProvider() { + return this; + } + } http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWordAppSpecFactory.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWordAppSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWordAppSpecFactory.java new file mode 100644 index 0000000..dc0406e --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWordAppSpecFactory.java @@ -0,0 +1,32 @@ +package org.apache.helix.provisioning.yarn.example; + +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.helix.provisioning.yarn.AppConfig; +import org.apache.helix.provisioning.yarn.ApplicationSpec; +import org.apache.helix.provisioning.yarn.ApplicationSpecFactory; +import org.yaml.snakeyaml.Yaml; + +public class HelloWordAppSpecFactory implements ApplicationSpecFactory{ + + @Override + public ApplicationSpec fromYaml(InputStream yamlFile) { + return null; + } + + public static void main(String[] args) { + Yaml yaml = new Yaml(); + HelloworldAppSpec data = new HelloworldAppSpec(); + data._appConfig = new AppConfig(); + data._appName="testApp"; + data._serviceConfigMap = new HashMap<String, Map<String,String>>(); + data._serviceConfigMap.put("HelloWorld", new HashMap<String, String>()); + data._serviceConfigMap.get("HelloWorld").put("k1", "v1"); + + String dump = yaml.dump(data); + System.out.println(dump); + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java new file mode 100644 index 0000000..4bd3caa --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java @@ -0,0 +1,86 @@ +package org.apache.helix.provisioning.yarn.example; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; + +import org.apache.helix.api.Scope; +import org.apache.helix.api.config.ParticipantConfig; +import org.apache.helix.api.config.UserConfig; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.provisioning.yarn.AppConfig; +import org.apache.helix.provisioning.yarn.ApplicationSpec; +import org.apache.helix.provisioning.yarn.TaskConfig; + +public class HelloworldAppSpec implements ApplicationSpec { + + public String _appName; + + public AppConfig _appConfig; + + public List<String> _services; + + public String _appMasterPackageUri; + + public Map<String, String> _servicePackageURIMap; + + public Map<String, String> _serviceMainClassMap; + + public Map<String,Map<String,String>> _serviceConfigMap; + + public List<TaskConfig> _taskConfigs; + @Override + public String getAppName() { + return _appName; + } + + @Override + public AppConfig getConfig() { + return _appConfig; + } + + @Override + public List<String> getServices() { + return _services; + } + + @Override + public URI getAppMasterPackage() { + try { + return new URI(_appMasterPackageUri); + } catch (URISyntaxException e) { + return null; + } + } + + @Override + public URI getServicePackage(String serviceName) { + try { + return new URI(_servicePackageURIMap.get(serviceName)); + } catch (URISyntaxException e) { + return null; + } + } + + @Override + public String getServiceMainClass(String service) { + return _serviceMainClassMap.get(service); + } + + @Override + public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId) { + ParticipantConfig.Builder builder = new ParticipantConfig.Builder(participantId); + Scope<ParticipantId> scope = Scope.participant(participantId); + UserConfig userConfig = new UserConfig(scope); + Map<String, String> map = _serviceConfigMap.get(serviceName); + userConfig.setSimpleFields(map); + return builder.addTag(serviceName).userConfig(userConfig ).build(); + } + + @Override + public List<TaskConfig> getTaskConfigs() { + return _taskConfigs; + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/7387834e/helix-provisioning/src/main/resources/hello_world_app_spec.yaml ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/resources/hello_world_app_spec.yaml b/helix-provisioning/src/main/resources/hello_world_app_spec.yaml new file mode 100644 index 0000000..e69de29
