Repository: helix Updated Branches: refs/heads/helix-provisioning cb6aa4fa0 -> 57b4b180e
Made container states more consistent, changed yarn target provider logic Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/57b4b180 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/57b4b180 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/57b4b180 Branch: refs/heads/helix-provisioning Commit: 57b4b180e0c0b7f3ae0c21191af1f72bca61732f Parents: cb6aa4f Author: Kanak Biscuitwala <[email protected]> Authored: Wed Feb 19 18:58:00 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Feb 19 18:58:00 2014 -0800 ---------------------------------------------------------------------- .../controller/provisioner/ContainerState.java | 5 +- .../stages/ContainerProvisioningStage.java | 9 ++- .../integration/TestLocalContainerProvider.java | 4 +- .../provisioning/yarn/YarnProvisioner.java | 83 ++++++++++++-------- 4 files changed, 61 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java index cf4b736..449f636 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java @@ -23,8 +23,9 @@ public enum ContainerState { ACQUIRING, ACQUIRED, CONNECTING, - ACTIVE, - TEARDOWN, + CONNECTED, + DISCONNECTED, + HALTING, HALTED, FINALIZING, FINALIZED, http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index 48166bf..42c8218 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -22,7 +22,6 @@ package org.apache.helix.controller.stages; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.UUID; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -166,12 +165,13 @@ public class ContainerProvisioningStage extends AbstractBaseStage { accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // create the helix participant and add it to cluster - ListenableFuture<Boolean> future = containerProvider.startContainer(containerId, participant); + ListenableFuture<Boolean> future = + containerProvider.startContainer(containerId, participant); FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), - ContainerState.ACTIVE); + ContainerState.CONNECTED); } @Override @@ -225,7 +225,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { .toString()); final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setInstanceEnabled(false); - existingInstance.setContainerState(ContainerState.TEARDOWN); + existingInstance.setContainerState(ContainerState.HALTING); accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // stop the container @@ -267,6 +267,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage { InstanceConfig existingInstance = helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString()); existingInstance.setContainerState(state); + existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED)); accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance); } http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java index 0e4c803..0f7be64 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java @@ -250,7 +250,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { participantService.startAsync(); participantService.awaitRunning(); _participants.put(containerId, participantService); - _states.put(containerId, ContainerState.ACTIVE); + _states.put(containerId, ContainerState.CONNECTED); started++; SettableFuture<Boolean> future = SettableFuture.create(); future.set(true); @@ -294,7 +294,7 @@ public class TestLocalContainerProvider extends ZkUnitTestBase { // acquired containers are ready to start containersToStart.add(participant); break; - case ACTIVE: + case CONNECTED: // stop at most two active at a time, wait for everything to be up first if (stopCount < 2 && _askCount >= MAX_PARTICIPANTS) { containersToStop.add(participant); http://git-wip-us.apache.org/repos/asf/helix/blob/57b4b180/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java ---------------------------------------------------------------------- diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java index 4fcc219..daac87b 100644 --- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java +++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java @@ -1,10 +1,6 @@ package org.apache.helix.provisioning.yarn; import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -13,22 +9,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.Vector; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.Executors; -import org.apache.commons.compress.archivers.ArchiveStreamFactory; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.Container; @@ -55,13 +42,13 @@ import org.apache.helix.controller.provisioner.ContainerProvider; import org.apache.helix.controller.provisioner.ContainerSpec; import org.apache.helix.controller.provisioner.ContainerState; import org.apache.helix.controller.provisioner.Provisioner; -import org.apache.helix.controller.provisioner.ProvisionerConfig; import org.apache.helix.controller.provisioner.TargetProvider; import org.apache.helix.controller.provisioner.TargetProviderResponse; import org.apache.helix.model.InstanceConfig; -import com.google.common.collect.Lists; import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -274,39 +261,57 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr .getProvisionerConfig(); int targetNumContainers = provisionerConfig.getNumContainers(); + // Any container that is in a state should be put in this set Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>(); - + + // Cache halted containers to determine which to restart and which to release + Map<ContainerId, Participant> excessHaltedContainers = Maps.newHashMap(); + + // Cache participants to ensure that excess participants are stopped + Map<ContainerId, Participant> excessActiveContainers = Maps.newHashMap(); for (Participant participant : participants) { ContainerConfig containerConfig = participant.getContainerConfig(); if (containerConfig != null && containerConfig.getState() != null) { ContainerState state = containerConfig.getState(); switch (state) { + case ACQUIRING: + existingContainersIdSet.add(containerConfig.getId()); + break; case ACQUIRED: // acquired containers are ready to start + existingContainersIdSet.add(containerConfig.getId()); containersToStart.add(participant); break; - case ACTIVE: + case CONNECTING: existingContainersIdSet.add(containerConfig.getId()); break; - case HALTED: - // halted containers can be released - containersToRelease.add(participant); + case CONNECTED: + // active containers can be stopped or kept active + existingContainersIdSet.add(containerConfig.getId()); + excessActiveContainers.put(containerConfig.getId(), participant); break; - case ACQUIRING: + case DISCONNECTED: + // disconnected containers must be stopped + existingContainersIdSet.add(containerConfig.getId()); + containersToStop.add(participant); + case HALTING: existingContainersIdSet.add(containerConfig.getId()); break; - case CONNECTING: + case HALTED: + // halted containers can be released or restarted + existingContainersIdSet.add(containerConfig.getId()); + excessHaltedContainers.put(containerConfig.getId(), participant); break; - case FAILED: - //remove the failed instance - _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), new InstanceConfig(participant.getId())); + case FINALIZING: + existingContainersIdSet.add(containerConfig.getId()); break; case FINALIZED: break; - case FINALIZING: - break; - case TEARDOWN: + case FAILED: + // remove the failed instance + _helixManager.getClusterManagmentTool().dropInstance(cluster.getId().toString(), + new InstanceConfig(participant.getId())); break; default: break; @@ -318,18 +323,32 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr } } } - + for (int i = 0; i < targetNumContainers; i++) { ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i)); - if(!existingContainersIdSet.contains(containerId)){ + excessActiveContainers.remove(containerId); // don't stop this container if active + if (excessHaltedContainers.containsKey(containerId)) { + // Halted containers can be restarted if necessary + Participant participant = excessHaltedContainers.get(containerId); + containersToStart.add(participant); + excessHaltedContainers.remove(containerId); // don't release this container + } else if (!existingContainersIdSet.contains(containerId)) { + // Unallocated containers must be allocated ContainerSpec containerSpec = new ContainerSpec(containerId); ParticipantId participantId = ParticipantId.from(containerId.stringify()); - ParticipantConfig participantConfig = applicationSpec.getParticipantConfig(resourceId.stringify(), participantId); + ParticipantConfig participantConfig = + applicationSpec.getParticipantConfig(resourceId.stringify(), participantId); containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024)); containersToAcquire.add(containerSpec); } } - + + // Add all the containers that should be stopped because they fall outside the target range + containersToStop.addAll(excessActiveContainers.values()); + + // Add halted containers that should not be restarted + containersToRelease.addAll(excessHaltedContainers.values()); + response.setContainersToAcquire(containersToAcquire); response.setContainersToStart(containersToStart); response.setContainersToRelease(containersToRelease);
