resolving conflicts
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f282a300 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f282a300 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f282a300 Branch: refs/heads/helix-provisioning Commit: f282a30039223a2068350cac12596b55c55dedca Parents: 9a1ba91 27f6272 Author: Kishore Gopalakrishna <[email protected]> Authored: Fri Jan 10 07:26:58 2014 -0800 Committer: Kishore Gopalakrishna <[email protected]> Committed: Fri Jan 10 07:26:58 2014 -0800 ---------------------------------------------------------------------- .../provisioner/ContainerProvider.java | 10 +- .../controller/provisioner/ContainerState.java | 4 +- .../stages/ContainerProvisioningStage.java | 150 +++++++++++++------ .../integration/TestLocalContainerProvider.java | 28 +++- .../provisioning/yarn/ContainerParticipant.java | 15 ++ .../yarn/GenericApplicationMaster.java | 27 ++-- .../provisioning/yarn/YarnProvisioner.java | 89 ++++++----- .../yarn/YarnProvisionerConfig.java | 46 ++++++ 8 files changed, 254 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java ---------------------------------------------------------------------- diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java index 0000000,0000000..b0272e8 new file mode 100644 --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java @@@ -1,0 -1,0 +1,15 @@@ ++package org.apache.helix.provisioning.yarn; ++ ++import java.util.Arrays; ++ ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++ ++public class ContainerParticipant { ++ private static final Log LOG = LogFactory.getLog(ContainerParticipant.class); ++ ++ public static void main(String[] args) throws InterruptedException { ++ LOG.info("Starting participant: "+ Arrays.toString(args)); ++ Thread.currentThread().join(); ++ } ++} http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java ---------------------------------------------------------------------- diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java index 1b27378,3b4e937..3adffd6 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/GenericApplicationMaster.java @@@ -98,17 -93,20 +94,15 @@@ public class GenericApplicationMaster // Tracking url to which app master publishes info for clients to monitor private String appMasterTrackingUrl = ""; - - // Counter for completed containers ( complete denotes successful or failed ) - AtomicInteger numCompletedContainers = new AtomicInteger(); - // Allocated container count so that we know how many containers has the RM - // allocated to us - AtomicInteger numAllocatedContainers = new AtomicInteger(); - // Count of failed containers - AtomicInteger numFailedContainers = new AtomicInteger(); - // Count of containers already requested from the RM - // Needed as once requested, we should not request for containers again. - // Only request for more if the original requirement changes. - AtomicInteger numRequestedContainers = new AtomicInteger(); Map<ContainerRequest, SettableFuture<ContainerAskResponse>> containerRequestMap = new LinkedHashMap<AMRMClient.ContainerRequest, SettableFuture<ContainerAskResponse>>(); + Map<ContainerId, SettableFuture<ContainerReleaseResponse>> containerReleaseMap = + new LinkedHashMap<ContainerId, SettableFuture<ContainerReleaseResponse>>(); + Map<ContainerId, SettableFuture<ContainerStopResponse>> containerStopMap = + new LinkedHashMap<ContainerId, SettableFuture<ContainerStopResponse>>(); + Map<ContainerId, SettableFuture<ContainerLaunchResponse>> containerLaunchResponseMap = + new LinkedHashMap<ContainerId, SettableFuture<ContainerLaunchResponse>>(); - ByteBuffer allTokens; // Launch threads @@@ -120,7 -117,7 +113,7 @@@ // Set up the configuration conf = new YarnConfiguration(); } -- ++ /** * Dump out contents of $CWD and the environment to stdout for debugging */ @@@ -154,15 -151,15 +147,13 @@@ } } -- -- /** * Parse command line options * @param args Command line args * @return Whether init successful and run should be invoked * @throws ParseException * @throws IOException -- * @throws YarnException ++ * @throws YarnException */ public boolean start() throws ParseException, IOException, YarnException { @@@ -234,33 -231,30 +225,35 @@@ return true; } -- - public Future<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) { + public ListenableFuture<ContainerAskResponse> acquireContainer(ContainerRequest containerAsk) { + amRMClient.addContainerRequest(containerAsk); - numRequestedContainers.incrementAndGet(); SettableFuture<ContainerAskResponse> future = SettableFuture.create(); + containerRequestMap.put(containerAsk, future); + amRMClient.addContainerRequest(containerAsk); return future; } - public Future<ContainerStopResponse> stopContainer(Container container) { + public ListenableFuture<ContainerStopResponse> stopContainer(Container container) { + nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId()); SettableFuture<ContainerStopResponse> future = SettableFuture.create(); + containerStopMap.put(container.getId(), future); + nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId()); return future; } - public Future<ContainerReleaseResponse> releaseContainer(Container container) { + public ListenableFuture<ContainerReleaseResponse> releaseContainer(Container container) { + amRMClient.releaseAssignedContainer(container.getId()); SettableFuture<ContainerReleaseResponse> future = SettableFuture.create(); + containerReleaseMap.put(container.getId(), future); + amRMClient.releaseAssignedContainer(container.getId()); return future; } - public Future<ContainerLaunchResponse> launchContainer(Container container, + public ListenableFuture<ContainerLaunchResponse> launchContainer(Container container, ContainerLaunchContext containerLaunchContext) { - nmClientAsync.startContainerAsync(container, containerLaunchContext); SettableFuture<ContainerLaunchResponse> future = SettableFuture.create(); + containerLaunchResponseMap.put(container.getId(), future); + nmClientAsync.startContainerAsync(container, containerLaunchContext); return future; } http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index f983255,e921c87..f74e312 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@@ -1,38 -1,14 +1,39 @@@ package org.apache.helix.provisioning.yarn; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Vector; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + import java.util.concurrent.Executors; +import org.apache.commons.compress.archivers.ArchiveStreamFactory; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@@ -50,178 -23,70 +51,190 @@@ import org.apache.helix.controller.prov import org.apache.helix.controller.provisioner.Provisioner; import org.apache.helix.controller.provisioner.TargetProviderResponse; +import com.google.common.collect.Lists; + import com.google.common.base.Function; + import com.google.common.util.concurrent.Futures; + import com.google.common.util.concurrent.ListenableFuture; + import com.google.common.util.concurrent.ListeningExecutorService; + import com.google.common.util.concurrent.MoreExecutors; public class YarnProvisioner implements Provisioner { private static final Log LOG = LogFactory.getLog(YarnProvisioner.class); static GenericApplicationMaster applicationMaster; - static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); ++ static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors ++ .newCachedThreadPool()); Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>(); + int DEFAULT_CONTAINER = 4; @Override - public ContainerId allocateContainer(ContainerSpec spec) { + public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) { ContainerRequest containerAsk = setupContainerAskForRM(spec); - Future<ContainerAskResponse> requestNewContainer = + ListenableFuture<ContainerAskResponse> requestNewContainer = applicationMaster.acquireContainer(containerAsk); - ContainerAskResponse containerAskResponse; - try { - containerAskResponse = requestNewContainer.get(); - ContainerId helixContainerId = - ContainerId.from(containerAskResponse.getContainer().getId().toString()); - allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer()); - return helixContainerId; - } catch (Exception e) { - LOG.error("Exception in allocateContainer for spec:" + spec, e); - } - return null; - return Futures.transform(requestNewContainer, new Function<ContainerAskResponse, ContainerId>() { - @Override - public ContainerId apply(ContainerAskResponse containerAskResponse) { - ContainerId helixContainerId = - ContainerId.from(containerAskResponse.getContainer().getId().toString()); - allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer()); - return helixContainerId; - } - }); ++ return Futures.transform(requestNewContainer, ++ new Function<ContainerAskResponse, ContainerId>() { ++ @Override ++ public ContainerId apply(ContainerAskResponse containerAskResponse) { ++ ContainerId helixContainerId = ++ ContainerId.from(containerAskResponse.getContainer().getId().toString()); ++ allocatedContainersMap.put(helixContainerId, containerAskResponse.getContainer()); ++ return helixContainerId; ++ } ++ }); ++ } @Override - public boolean deallocateContainer(ContainerId containerId) { - Future<ContainerReleaseResponse> releaseContainer = + public ListenableFuture<Boolean> deallocateContainer(ContainerId containerId) { + ListenableFuture<ContainerReleaseResponse> releaseContainer = applicationMaster.releaseContainer(allocatedContainersMap.get(containerId)); - try { - releaseContainer.get(); - return true; - } catch (Exception e) { - LOG.error("Exception in deallocateContainer containerId:" + containerId, e); - } - return false; + return Futures.transform(releaseContainer, new Function<ContainerReleaseResponse, Boolean>() { + @Override + public Boolean apply(ContainerReleaseResponse response) { + return response != null; + } + }, service); ++ } @Override - public boolean startContainer(ContainerId containerId) { + public ListenableFuture<Boolean> startContainer(final ContainerId containerId) { Container container = allocatedContainersMap.get(containerId); - ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class); - ListenableFuture<ContainerLaunchResponse> future = applicationMaster.launchContainer(container, containerLaunchContext); ++ ContainerLaunchContext launchContext; + try { - Future<ContainerLaunchResponse> launchContainer = - applicationMaster.launchContainer(container, createLaunchContext(containerId)); - ContainerLaunchResponse containerLaunchResponse = launchContainer.get(); - return true; ++ launchContext = createLaunchContext(containerId); + } catch (Exception e) { - LOG.error("Exception while starting container containerId:" + containerId, e); ++ LOG.error("Exception while creating context to launch container:" + containerId, e); ++ return null; + } - return false; ++ ListenableFuture<ContainerLaunchResponse> future = ++ applicationMaster.launchContainer(container, launchContext); + return Futures.transform(future, new Function<ContainerLaunchResponse, Boolean>() { + @Override + public Boolean apply(ContainerLaunchResponse response) { + return response != null; + } + }, service); } + private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception { + + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + Map<String, String> envs = System.getenv(); + String appName = envs.get("appName"); + String appId = envs.get("appId"); + String appClasspath = envs.get("appClasspath"); + String containerParticipantMainClass = envs.get("containerParticipantMainClass"); + String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181"; + + // set the localresources needed to launch container + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + + LocalResource amJarRsrc = Records.newRecord(LocalResource.class); + YarnConfiguration conf = new YarnConfiguration(); + FileSystem fs; + fs = FileSystem.get(conf); + String pathSuffix = appName + "/" + appId + "/app-pkg.tar"; + Path dst = new Path(fs.getHomeDirectory(), pathSuffix); + FileStatus destStatus = fs.getFileStatus(dst); + + // Set the type of resource - file or archive + // archives are untarred at destination + // we don't need the jar file to be untarred for now + amJarRsrc.setType(LocalResourceType.ARCHIVE); + // Set visibility of the resource + // Setting to most private option + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION); + // Set the resource to be copied over + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + // Set timestamp and length of file so that the framework + // can do basic sanity checks for the local resource + // after it has been copied over to ensure it is the same + // resource the client intended to use with the application + amJarRsrc.setTimestamp(destStatus.getModificationTime()); + amJarRsrc.setSize(destStatus.getLen()); + localResources.put("app-pkg", amJarRsrc); + + // Set local resource info into app master container launch context + amContainer.setLocalResources(localResources); + + // Set the necessary security tokens as needed + // amContainer.setContainerTokens(containerToken); + + // Set the env variables to be setup in the env where the application master will be run + LOG.info("Set the environment for the application master"); + Map<String, String> env = new HashMap<String, String>(); + env.put("app-pkg-path", dst.getName()); + // Add AppMaster.jar location to classpath + // At some point we should not be required to add + // the hadoop specific classpaths to the env. + // It should be provided out of the box. + // For now setting all required classpaths including + // the classpath to "." for the application jar + StringBuilder classPathEnv = + new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*"); + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(appClasspath); + + for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(File.pathSeparatorChar); + classPathEnv.append(c.trim()); + } + classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties"); + + // add the runtime classpath needed for tests to work + if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + classPathEnv.append(':'); + classPathEnv.append(System.getProperty("java.class.path")); + } + System.out.println("classoath" + classPathEnv.toString()); + env.put("CLASSPATH", classPathEnv.toString()); + + amContainer.setEnvironment(env); + + // Set the necessary command to execute the application master + Vector<CharSequence> vargs = new Vector<CharSequence>(30); + + // Set java executable command + LOG.info("Setting up app master command"); + vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); + // Set Xmx based on am memory size + vargs.add("-Xmx" + 1024 + "m"); + // Set class name + vargs.add(containerParticipantMainClass); + // Set params for container participant + vargs.add("--zk_address " + zkAddress); + vargs.add("--participantId " + containerId.stringify()); + + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout"); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr"); + + // Get final commmand + StringBuilder command = new StringBuilder(); + for (CharSequence str : vargs) { + command.append(str).append(" "); + } + + LOG.info("Completed setting up app master command " + command.toString()); + List<String> commands = new ArrayList<String>(); + commands.add(command.toString()); + amContainer.setCommands(commands); + return amContainer; + } + @Override - public boolean stopContainer(ContainerId containerId) { + public ListenableFuture<Boolean> stopContainer(final ContainerId containerId) { Container container = allocatedContainersMap.get(containerId); - Future<ContainerStopResponse> stopContainer = applicationMaster.stopContainer(container); - try { - ContainerStopResponse containerStopResponse = stopContainer.get(); - return true; - } catch (Exception e) { - LOG.error("Exception while stopping container containerId:" + containerId, e); - } - return false; + ListenableFuture<ContainerStopResponse> future = applicationMaster.stopContainer(container); + return Futures.transform(future, new Function<ContainerStopResponse, Boolean>() { + @Override + public Boolean apply(ContainerStopResponse response) { + return response != null; + } + }, service); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/f282a300/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java ---------------------------------------------------------------------- diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java index 0000000,0000000..8427c14 new file mode 100644 --- /dev/null +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java @@@ -1,0 -1,0 +1,46 @@@ ++package org.apache.helix.provisioning.yarn; ++ ++import org.apache.helix.api.id.ResourceId; ++import org.apache.helix.controller.provisioner.ProvisionerConfig; ++import org.apache.helix.controller.provisioner.ProvisionerRef; ++import org.apache.helix.controller.serializer.DefaultStringSerializer; ++import org.apache.helix.controller.serializer.StringSerializer; ++import org.apache.helix.integration.TestLocalContainerProvider.LocalProvisioner; ++import org.codehaus.jackson.annotate.JsonProperty; ++ ++public class YarnProvisionerConfig implements ProvisionerConfig { ++ ++ private ResourceId _resourceId; ++ private Class<? extends StringSerializer> _serializerClass; ++ private ProvisionerRef _provisionerRef; ++ ++ public YarnProvisionerConfig(@JsonProperty("resourceId") ResourceId resourceId) { ++ _resourceId = resourceId; ++ _serializerClass = DefaultStringSerializer.class; ++ _provisionerRef = ProvisionerRef.from(YarnProvisioner.class.getName()); ++ } ++ ++ @Override ++ public ResourceId getResourceId() { ++ return _resourceId; ++ } ++ ++ @Override ++ public ProvisionerRef getProvisionerRef() { ++ return _provisionerRef; ++ } ++ ++ public void setProvisionerRef(ProvisionerRef provisionerRef) { ++ _provisionerRef = provisionerRef; ++ } ++ ++ @Override ++ public Class<? extends StringSerializer> getSerializerClass() { ++ return _serializerClass; ++ } ++ ++ public void setSerializerClass(Class<? extends StringSerializer> serializerClass) { ++ _serializerClass = serializerClass; ++ } ++ ++}
