This is an automated email from the ASF dual-hosted git repository. abhijain pushed a commit to branch revert-4092-virai_add_yarn_container_replacement in repository https://gitbox.apache.org/repos/asf/gobblin.git
commit d0d6d0dd77da92d9a74da2b6b17d32784fa56abd Author: abhishekmjain <[email protected]> AuthorDate: Fri Feb 21 11:20:57 2025 +0530 Revert "[GOBBLIN-2189] Implement ContainerCompletion callback in DynamicScalingYarnService" --- .../AbstractDynamicScalingYarnServiceManager.java | 6 +- .../temporal/yarn/DynamicScalingYarnService.java | 185 +----------- .../apache/gobblin/temporal/yarn/YarnService.java | 315 +++++++++++++++++---- .../dynamic/DummyScalingDirectiveSource.java | 12 - .../yarn/DynamicScalingYarnServiceManagerTest.java | 24 +- .../yarn/DynamicScalingYarnServiceTest.java | 239 +--------------- .../gobblin/temporal/yarn/YarnServiceTest.java | 21 +- 7 files changed, 285 insertions(+), 517 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java index e49ec66a22..ca6aa72064 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java @@ -110,13 +110,9 @@ public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractI List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives(); if (CollectionUtils.isNotEmpty(scalingDirectives)) { dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); - } else { - dynamicScalingYarnService.calcDeltasAndRequestContainers(); } } catch (FileNotFoundException fnfe) { - // FNFE comes when scaling directives path is not yet created, so we should just calc delta & request containers if needed - log.debug("Scaling directives file not found(possibly not yet created). Falling back to delta calculation. - " + fnfe.getMessage()); - dynamicScalingYarnService.calcDeltasAndRequestContainers(); + log.warn("Failed to get scaling directives - " + fnfe.getMessage()); // important message, but no need for a stack trace } catch (IOException e) { log.error("Failed to get scaling directives", e); } catch (Throwable t) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java index 0010a45ff8..0720017b85 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -17,20 +17,10 @@ package org.apache.gobblin.temporal.yarn; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import com.google.common.eventbus.EventBus; @@ -38,8 +28,6 @@ import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.temporal.dynamic.ProfileDerivation; -import org.apache.gobblin.temporal.dynamic.ProfileOverlay; import org.apache.gobblin.temporal.dynamic.ScalingDirective; import org.apache.gobblin.temporal.dynamic.StaffingDeltas; import org.apache.gobblin.temporal.dynamic.WorkerProfile; @@ -54,19 +42,11 @@ import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; */ @Slf4j public class DynamicScalingYarnService extends YarnService { - private static final String DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = "replacementWorkerProfile"; - private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1; - protected static final int GENERAL_OOM_EXIT_STATUS_CODE = 137; - protected static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 2; - private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 64GB - private static final int EPSILON_MIILIS = 1; /** this holds the current count of containers already requested for each worker profile */ private final WorkforceStaffing actualWorkforceStaffing; /** this holds the current total workforce plan as per latest received scaling directives */ private final WorkforcePlan workforcePlan; - protected final Queue<ContainerId> removedContainerIds; - private final AtomicLong profileNameSuffixGenerator; public DynamicScalingYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { @@ -74,8 +54,6 @@ public class DynamicScalingYarnService extends YarnService { this.actualWorkforceStaffing = WorkforceStaffing.initialize(0); this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY)); - this.removedContainerIds = new ConcurrentLinkedQueue<>(); - this.profileNameSuffixGenerator = new AtomicLong(); } @Override @@ -84,78 +62,6 @@ public class DynamicScalingYarnService extends YarnService { requestNewContainersForStaffingDeltas(deltas); } - /** - * Handle the completion of a container. A new container will be requested to replace the one - * that just exited depending on the exit status. - * <p> - * A container completes in either of the following conditions: - * <ol> - * <li> The container gets stopped by the ApplicationMaster. </li> - * <li> Some error happens in the container and caused the container to exit </li> - * <li> The container gets preempted by the ResourceManager </li> - * <li> The container gets killed due to some reason, for example, if it runs over the allowed amount of virtual or physical memory </li> - * </ol> - * A replacement container is needed in all except the first case. - * </p> - */ - @Override - protected void handleContainerCompletion(ContainerStatus containerStatus) { - ContainerId completedContainerId = containerStatus.getContainerId(); - ContainerInfo completedContainerInfo = this.containerMap.remove(completedContainerId); - - // Because callbacks are processed asynchronously, we might encounter situations where handleContainerCompletion() - // is called before onContainersAllocated(), resulting in the containerId missing from the containersMap. - // We use removedContainerIds to remember these containers and remove them from containerMap later - // when we call reviseWorkforcePlanAndRequestNewContainers method - if (completedContainerInfo == null) { - log.warn("Container {} not found in containerMap. This container onContainersCompleted() likely called before onContainersAllocated()", - completedContainerId); - this.removedContainerIds.add(completedContainerId); - return; - } - - log.info("Container {} running profile {} completed with exit status {}", - completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getExitStatus()); - - if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) { - log.info("Container {} running profile {} completed with diagnostics: {}", - completedContainerId, completedContainerInfo.getWorkerProfileName(), containerStatus.getDiagnostics()); - } - - if (this.shutdownInProgress) { - log.info("Ignoring container completion for container {} as shutdown is in progress", completedContainerId); - return; - } - - WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); - - switch (containerStatus.getExitStatus()) { - case(ContainerExitStatus.ABORTED): - handleAbortedContainer(completedContainerId, completedContainerInfo); - break; - case(ContainerExitStatus.PREEMPTED): - log.info("Container {} for profile {} preempted, starting to launching a replacement container", - completedContainerId, completedContainerInfo.getWorkerProfileName()); - requestContainersForWorkerProfile(workerProfile, 1); - break; - case(GENERAL_OOM_EXIT_STATUS_CODE): - case(ContainerExitStatus.KILLED_EXCEEDED_VMEM): - case(ContainerExitStatus.KILLED_EXCEEDED_PMEM): - handleContainerExitedWithOOM(completedContainerId, completedContainerInfo); - break; - case(LAUNCH_CONTAINER_FAILED_EXIT_CODE): - log.info("Exit status 1.CompletedContainerInfo = {}", completedContainerInfo); - break; - case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION: - case ContainerExitStatus.SUCCESS: - break; - default: - log.warn("Container {} exited with unhandled status code {}. ContainerInfo: {}", - completedContainerId, containerStatus.getExitStatus(), completedContainerInfo); - break; - } - } - /** * Revises the workforce plan and requests new containers based on the given scaling directives. * @@ -166,101 +72,28 @@ public class DynamicScalingYarnService extends YarnService { return; } this.workforcePlan.reviseWhenNewer(scalingDirectives); - calcDeltasAndRequestContainers(); - } - - public synchronized void calcDeltasAndRequestContainers() { - // Correct the actualWorkforceStaffing in case of handleContainerCompletion() getting called before onContainersAllocated() - Iterator<ContainerId> iterator = removedContainerIds.iterator(); - while (iterator.hasNext()) { - ContainerId containerId = iterator.next(); - ContainerInfo containerInfo = this.containerMap.remove(containerId); - if (containerInfo != null) { - WorkerProfile workerProfile = containerInfo.getWorkerProfile(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); - if (currNumContainers > 0) { - this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, - System.currentTimeMillis()); - } - iterator.remove(); - } - } StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { deltas.getPerProfileDeltas().forEach(profileDelta -> { - WorkerProfile workerProfile = profileDelta.getProfile(); - String profileName = workerProfile.getName(); - int delta = profileDelta.getDelta(); - int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); - if (delta > 0) { // scale up! + if (profileDelta.getDelta() > 0) { // scale up! + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + int delta = profileDelta.getDelta(); log.info("Requesting {} new containers for profile {} having currently {} containers", delta, WorkforceProfiles.renderName(profileName), currNumContainers); requestContainersForWorkerProfile(workerProfile, delta); // update our staffing after requesting new containers this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); - } else if (delta < 0) { // scale down! - log.info("Releasing {} containers for profile {} having currently {} containers", -delta, - WorkforceProfiles.renderName(profileName), currNumContainers); - releaseContainersForWorkerProfile(profileName, delta); - // update our staffing after releasing containers - int numContainersAfterRelease = Math.max(currNumContainers + delta, 0); - this.actualWorkforceStaffing.reviseStaffing(profileName, numContainersAfterRelease, System.currentTimeMillis()); + } else if (profileDelta.getDelta() < 0) { // scale down! + // TODO: Decide how to handle negative deltas + log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", + profileDelta.getProfile().getName(), profileDelta.getDelta()); } // else, already at staffing plan (or at least have requested, so in-progress) }); } - private void handleAbortedContainer(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { - // Case 1 : Container release requested while scaling down - if (this.releasedContainerCache.getIfPresent(completedContainerId) != null) { - log.info("Container {} was released while downscaling for profile {}", completedContainerId, completedContainerInfo.getWorkerProfileName()); - this.releasedContainerCache.invalidate(completedContainerId); - return; - } - - // Case 2 : Container release was not requested, we need to request a replacement container - log.info("Container {} aborted for profile {}, starting to launch a replacement container", completedContainerId, completedContainerInfo.getWorkerProfileName()); - requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1); - } - - private synchronized void handleContainerExitedWithOOM(ContainerId completedContainerId, ContainerInfo completedContainerInfo) { - log.info("Container {} for profile {} exited with OOM, starting to launch a replacement container", - completedContainerId, completedContainerInfo.getWorkerProfileName()); - - List<ScalingDirective> scalingDirectives = new ArrayList<>(); - - WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile(); - long currTimeMillis = System.currentTimeMillis(); - // Update the current staffing to reflect the container that exited with OOM - int currNumContainers = this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0); - if (currNumContainers > 0) { - this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), currNumContainers - 1, currTimeMillis); - // Add a scaling directive so that workforcePlan have uptodate setPoints for the workerProfile, - // otherwise extra containers will be requested when calculating deltas - scalingDirectives.add(new ScalingDirective(workerProfile.getName(), currNumContainers - 1, currTimeMillis)); - } - - // Request a replacement container - int currContainerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) { - log.warn("Container {} already had max allowed memory {} MBs. Not requesting a replacement container.", - completedContainerId, currContainerMemoryMbs); - return; - } - int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER, - MAX_REPLACEMENT_CONTAINER_MEMORY_MBS); - Optional<ProfileDerivation> optProfileDerivation = Optional.of(new ProfileDerivation(workerProfile.getName(), - new ProfileOverlay.Adding(new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, newContainerMemoryMbs + "")) - )); - scalingDirectives.add(new ScalingDirective( - DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + profileNameSuffixGenerator.getAndIncrement(), - 1, - currTimeMillis + EPSILON_MIILIS, // Each scaling directive should have a newer timestamp than the previous one - optProfileDerivation - )); - reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); - } - } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index 2818982bab..ec4da215a6 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -80,6 +81,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; import com.google.common.io.Closer; import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; @@ -104,8 +106,10 @@ import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; import org.apache.gobblin.yarn.GobblinYarnEventConstants; import org.apache.gobblin.yarn.GobblinYarnMetricTagNames; import org.apache.gobblin.yarn.YarnHelixUtils; +import org.apache.gobblin.yarn.event.ContainerReleaseRequest; +import org.apache.gobblin.yarn.event.ContainerShutdownRequest; +import org.apache.gobblin.yarn.event.NewContainerRequest; import org.apache.gobblin.temporal.dynamic.WorkerProfile; -import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; /** * This class is responsible for all Yarn-related stuffs including ApplicationMaster registration, @@ -120,9 +124,13 @@ class YarnService extends AbstractIdleService { private static final Logger LOGGER = LoggerFactory.getLogger(YarnService.class); + private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN"; + private final String applicationName; private final String applicationId; private final String appViewAcl; + //Default helix instance tag derived from cluster level config + private final String helixInstanceTags; protected final Config config; private final EventBus eventBus; @@ -133,10 +141,16 @@ class YarnService extends AbstractIdleService { private final Optional<GobblinMetrics> gobblinMetrics; private final Optional<EventSubmitter> eventSubmitter; + @VisibleForTesting @Getter(AccessLevel.PROTECTED) private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync; private final NMClientAsync nmClientAsync; private final ExecutorService containerLaunchExecutor; + private final int requestedContainerMemoryMbs; + private final int requestedContainerCores; + private final boolean containerHostAffinityEnabled; + + private final int helixInstanceMaxRetries; private final String containerTimezone; private final String proxyJvmArgs; @@ -150,18 +164,34 @@ class YarnService extends AbstractIdleService { private final Object allContainersStopped = new Object(); - // A map from container IDs to Container instances, WorkerProfile Name and WorkerProfile Object - protected final ConcurrentMap<ContainerId, ContainerInfo> containerMap = new ConcurrentHashMap<>(); + // A map from container IDs to Container instances, Helix participant IDs of the containers and Helix Tag + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) + private final ConcurrentMap<ContainerId, ContainerInfo> containerMap = Maps.newConcurrentMap(); // A cache of the containers with an outstanding container release request. // This is a cache instead of a set to get the automatic cleanup in case a container completes before the requested // release. - protected final Cache<ContainerId, String> releasedContainerCache; + @VisibleForTesting + @Getter(AccessLevel.PROTECTED) + private final Cache<ContainerId, String> releasedContainerCache; + + // A map from Helix instance names to the number times the instances are retried to be started + private final ConcurrentMap<String, AtomicInteger> helixInstanceRetryCount = Maps.newConcurrentMap(); + + // A concurrent HashSet of unused Helix instance names. An unused Helix instance name gets put + // into the set if the container running the instance completes. Unused Helix + // instance names get picked up when replacement containers get allocated. + private final Set<String> unusedHelixInstanceNames = ConcurrentHashMap.newKeySet(); + + // The map from helix tag to allocated container count + private final ConcurrentMap<String, AtomicInteger> allocatedContainerCountMap = Maps.newConcurrentMap(); + private final ConcurrentMap<ContainerId, String> removedContainerID = Maps.newConcurrentMap(); private final AtomicInteger priorityNumGenerator = new AtomicInteger(0); private final Map<String, Integer> resourcePriorityMap = new HashMap<>(); - protected volatile boolean shutdownInProgress = false; + private volatile boolean shutdownInProgress = false; private final boolean jarCacheEnabled; private static final long DEFAULT_ALLOCATION_REQUEST_ID = 0L; @@ -196,6 +226,14 @@ class YarnService extends AbstractIdleService { this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler())); this.nmClientAsync.init(this.yarnConfiguration); + this.requestedContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + this.requestedContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY); + this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED); + + this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES); + this.helixInstanceTags = ConfigUtils.getString(config, + GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG); + this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; @@ -219,10 +257,53 @@ class YarnService extends AbstractIdleService { } + @SuppressWarnings("unused") + @Subscribe + public void handleNewContainerRequest(NewContainerRequest newContainerRequest) { + if (!this.maxResourceCapacity.isPresent()) { + LOGGER.error(String.format( + "Unable to handle new container request as maximum resource capacity is not available: " + + "[memory (MBs) requested = %d, vcores requested = %d]", this.requestedContainerMemoryMbs, + this.requestedContainerCores)); + return; + } + requestContainer(newContainerRequest.getReplacedContainer().transform(container -> container.getNodeId().getHost()), + newContainerRequest.getResource()); + } + protected NMClientCallbackHandler getNMClientCallbackHandler() { return new NMClientCallbackHandler(); } + @SuppressWarnings("unused") + @Subscribe + public void handleContainerShutdownRequest(ContainerShutdownRequest containerShutdownRequest) { + for (Container container : containerShutdownRequest.getContainers()) { + LOGGER.info(String.format("Stopping container %s running on %s", container.getId(), container.getNodeId())); + this.nmClientAsync.stopContainerAsync(container.getId(), container.getNodeId()); + } + } + + /** + * Request the Resource Manager to release the container + * @param containerReleaseRequest containers to release + */ + @Subscribe + public void handleContainerReleaseRequest(ContainerReleaseRequest containerReleaseRequest) { + for (Container container : containerReleaseRequest.getContainers()) { + LOGGER.info(String.format("Releasing container %s running on %s", container.getId(), container.getNodeId())); + + // Record that this container was explicitly released so that a new one is not spawned to replace it + // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion() + // can check for the container id and skip spawning a replacement container. + // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently + // with the release call. So in some cases a replacement container may have already been spawned before + // the container is put into the black list. + this.releasedContainerCache.put(container.getId(), ""); + this.amrmClientAsync.releaseAssignedContainer(container.getId()); + } + } + @Override protected synchronized void startUp() throws Exception { LOGGER.info("Starting the TemporalYarnService"); @@ -254,8 +335,8 @@ class YarnService extends AbstractIdleService { // Stop the running containers for (ContainerInfo containerInfo : this.containerMap.values()) { - LOGGER.info("Stopping container {} running worker profile {}", containerInfo.getContainer().getId(), - containerInfo.getWorkerProfileName()); + LOGGER.info("Stopping container {} running participant {}", containerInfo.getContainer().getId(), + containerInfo.getHelixParticipantId()); this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), containerInfo.getContainer().getNodeId()); } @@ -330,27 +411,10 @@ class YarnService extends AbstractIdleService { requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, containerCores), Optional.of(allocationRequestId)); } - protected synchronized void releaseContainersForWorkerProfile(String profileName, int numContainers) { - int numContainersToRelease = numContainers; - Iterator<Map.Entry<ContainerId, ContainerInfo>> containerMapIterator = this.containerMap.entrySet().iterator(); - while (containerMapIterator.hasNext() && numContainers > 0) { - Map.Entry<ContainerId, ContainerInfo> entry = containerMapIterator.next(); - if (entry.getValue().getWorkerProfile().getName().equals(profileName)) { - ContainerId containerId = entry.getKey(); - LOGGER.info("Releasing container {} running profile {}", containerId, WorkforceProfiles.renderName(profileName)); - // Record that this container was explicitly released so that a new one is not spawned to replace it - // Put the container id in the releasedContainerCache before releasing it so that handleContainerCompletion() - // can check for the container id and skip spawning a replacement container. - // Note that this is the best effort since these are asynchronous operations and a container may abort concurrently - // with the release call. So in some cases a replacement container may have already been spawned before - // the container is put into the black list. - this.releasedContainerCache.put(containerId, ""); - this.amrmClientAsync.releaseAssignedContainer(containerId); - numContainers--; - } - } - LOGGER.info("Released {} containers out of {} requested for profile {}", numContainersToRelease - numContainers, - numContainersToRelease, profileName); + private void requestContainer(Optional<String> preferredNode, Optional<Resource> resourceOptional) { + Resource desiredResource = resourceOptional.or(Resource.newInstance( + this.requestedContainerMemoryMbs, this.requestedContainerCores)); + requestContainer(preferredNode, desiredResource, Optional.absent()); } /** @@ -360,7 +424,7 @@ class YarnService extends AbstractIdleService { * @param resource */ protected void requestContainers(int numContainers, Resource resource, Optional<Long> optAllocationRequestId) { - LOGGER.info("Requesting {} containers with resource = {} and allocation request id = {}", numContainers, resource, optAllocationRequestId); + LOGGER.info("Requesting {} containers with resource={} and allocation request id = {}", numContainers, resource, optAllocationRequestId); IntStream.range(0, numContainers) .forEach(i -> requestContainer(Optional.absent(), resource, optAllocationRequestId)); } @@ -488,7 +552,17 @@ class YarnService extends AbstractIdleService { } } - private String buildContainerCommand(Container container, String workerProfileName, WorkerProfile workerProfile) { + @VisibleForTesting + protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) { + long allocationRequestId = container.getAllocationRequestId(); + WorkerProfile workerProfile = Optional.fromNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId)) + .or(() -> { + LOGGER.warn("No Worker Profile found for {}, so falling back to default", allocationRequestId); + return this.workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID, k -> { + LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker Profile even yet mapped to the default allocation request ID {} - creating one now", DEFAULT_ALLOCATION_REQUEST_ID); + return new WorkerProfile(this.config); + }); + }); Config workerProfileConfig = workerProfile.getConfig(); double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig, @@ -500,13 +574,13 @@ class YarnService extends AbstractIdleService { GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); Preconditions.checkArgument(workerJvmMemoryXmxRatio >= 0 && workerJvmMemoryXmxRatio <= 1, - workerProfileName + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + + workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); long containerMemoryMbs = container.getResource().getMemorySize(); Preconditions.checkArgument(workerJvmMemoryOverheadMbs < containerMemoryMbs * workerJvmMemoryXmxRatio, - workerProfileName + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + + workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); @@ -528,8 +602,14 @@ class YarnService extends AbstractIdleService { .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) .append(" ").append(this.applicationName) .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME) - .append(" ").append(this.applicationId); + .append(" ").append(this.applicationId) + .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME) + .append(" ").append(helixParticipantId); + if (!Strings.isNullOrEmpty(helixInstanceTag)) { + containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME) + .append(" ").append(helixInstanceTag); + } return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( containerProcessName).append(".").append(ApplicationConstants.STDOUT) .append(" 2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append( @@ -537,11 +617,123 @@ class YarnService extends AbstractIdleService { } /** - * Handle the completion of a container. - * Just removes the containerId from {@link #containerMap} + * Check the exit status of a completed container and see if the replacement container + * should try to be started on the same node. Some exit status indicates a disk or + * node failure and in such cases the replacement container should try to be started on + * a different node. + */ + private boolean shouldStickToTheSameNode(int containerExitStatus) { + switch (containerExitStatus) { + case ContainerExitStatus.DISKS_FAILED: + return false; + case ContainerExitStatus.ABORTED: + // Mostly likely this exit status is due to node failures because the + // application itself will not release containers. + return false; + default: + // Stick to the same node for other cases if host affinity is enabled. + return this.containerHostAffinityEnabled; + } + } + + /** + * Handle the completion of a container. A new container will be requested to replace the one + * that just exited. Depending on the exit status and if container host affinity is enabled, + * the new container may or may not try to be started on the same node. + * <p> + * A container completes in either of the following conditions: 1) some error happens in the + * container and caused the container to exit, 2) the container gets killed due to some reason, + * for example, if it runs over the allowed amount of virtual or physical memory, 3) the gets + * preempted by the ResourceManager, or 4) the container gets stopped by the ApplicationMaster. + * A replacement container is needed in all but the last case. */ protected void handleContainerCompletion(ContainerStatus containerStatus) { - this.containerMap.remove(containerStatus.getContainerId()); + ContainerInfo completedContainerInfo = this.containerMap.remove(containerStatus.getContainerId()); + //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might + //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the + //containerId missing from the containersMap. + // We use removedContainerID to remember these containers and remove them from containerMap later when we call requestTargetNumberOfContainers method + if (completedContainerInfo == null) { + removedContainerID.putIfAbsent(containerStatus.getContainerId(), ""); + } + String completedInstanceName = UNKNOWN_HELIX_INSTANCE; + + String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag(); + if (completedContainerInfo != null) { + allocatedContainerCountMap.get(helixTag).decrementAndGet(); + } + + LOGGER.info(String.format("Container %s running Helix instance %s with tag %s has completed with exit status %d", + containerStatus.getContainerId(), completedInstanceName, helixTag, containerStatus.getExitStatus())); + + if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) { + LOGGER.info(String.format("Received the following diagnostics information for container %s: %s", + containerStatus.getContainerId(), containerStatus.getDiagnostics())); + } + + switch(containerStatus.getExitStatus()) { + case(ContainerExitStatus.ABORTED): + if (handleAbortedContainer(containerStatus, completedContainerInfo, completedInstanceName)) { + return; + } + break; + case(1): // Same as linux exit status 1 Often occurs when launch_container.sh failed + LOGGER.info("Exit status 1. CompletedContainerInfo={}", completedContainerInfo); + break; + default: + break; + } + + if (this.shutdownInProgress) { + return; + } + if(completedContainerInfo != null) { + this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0)); + int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet(); + + // Populate event metadata + Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = Optional.absent(); + if (this.eventSubmitter.isPresent()) { + eventMetadataBuilder = Optional.of(buildContainerStatusEventMetadata(containerStatus)); + eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID, completedInstanceName); + eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT, retryCount + ""); + } + + if (this.helixInstanceMaxRetries > 0 && retryCount > this.helixInstanceMaxRetries) { + if (this.eventSubmitter.isPresent()) { + this.eventSubmitter.get() + .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build()); + } + + LOGGER.warn("Maximum number of retries has been achieved for Helix instance " + completedInstanceName); + return; + } + + // Add the Helix instance name of the completed container to the set of unused + // instance names so they can be reused by a replacement container. + LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName); + this.unusedHelixInstanceNames.add(completedInstanceName); + + /** + * NOTE: logic for handling container failure is removed because {@link #YarnService} relies on the auto scaling manager + * to control the number of containers by polling helix for the current number of tasks + * Without that integration, that code requests too many containers when there are exceptions and overloads yarn + */ + } + } + + private boolean handleAbortedContainer(ContainerStatus containerStatus, ContainerInfo completedContainerInfo, + String completedInstanceName) { + if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) { + LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId()); + if (completedContainerInfo != null) { + LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName); + this.unusedHelixInstanceNames.add(completedInstanceName); + } + return true; + } + LOGGER.info("Container {} aborted due to lost NM", containerStatus.getContainerId()); + return false; } private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(ContainerStatus containerStatus) { @@ -590,28 +782,33 @@ class YarnService extends AbstractIdleService { @Override public void onContainersAllocated(List<Container> containers) { for (final Container container : containers) { - long allocationRequestId = container.getAllocationRequestId(); - WorkerProfile workerProfile = Optional.fromNullable(workerProfileByAllocationRequestId.get(allocationRequestId)) - .or(() -> { - LOGGER.warn("No Worker Profile found for {}, so falling back to default", allocationRequestId); - return workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID, k -> { - LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker Profile even yet mapped to the default allocation request ID {} - creating one now", DEFAULT_ALLOCATION_REQUEST_ID); - return new WorkerProfile(config); - }); - }); - String containerId = container.getId().toString(); + String containerHelixTag = helixInstanceTags; if (eventSubmitter.isPresent()) { eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION, GobblinYarnMetricTagNames.CONTAINER_ID, containerId); } - LOGGER.info("Container {} has been allocated with resource {} for Worker Profile {}", - container.getId(), container.getResource(), WorkforceProfiles.renderName(workerProfile.getName())); + LOGGER.info("Container {} has been allocated with resource {} for helix tag {}", + container.getId(), container.getResource(), containerHelixTag); + + //Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live. + //Once we find a candidate instance, it is removed from the set. + String instanceName = null; + + //Ensure that updates to unusedHelixInstanceNames are visible to other threads that might concurrently + //invoke the callback on container allocation. + synchronized (this) { + Iterator<String> iterator = unusedHelixInstanceNames.iterator(); + while (iterator.hasNext()) { + instanceName = iterator.next(); + } + } - ContainerInfo containerInfo = new ContainerInfo(container, - WorkforceProfiles.renderName(workerProfile.getName()), workerProfile); + ContainerInfo containerInfo = new ContainerInfo(container, instanceName, containerHelixTag); containerMap.put(container.getId(), containerInfo); + allocatedContainerCountMap.putIfAbsent(containerHelixTag, new AtomicInteger(0)); + allocatedContainerCountMap.get(containerHelixTag).incrementAndGet(); // Find matching requests and remove the request (YARN-660). We the scheduler are responsible // for cleaning up requests after allocation based on the design in the described ticket. @@ -772,26 +969,26 @@ class YarnService extends AbstractIdleService { } } - // Class encapsulates Container instance, WorkerProfile name to print, WorkerProfile, and + // Class encapsulates Container instances, Helix participant IDs of the containers, Helix Tag, and // initial startup command @Getter class ContainerInfo { private final Container container; - private final String workerProfileName; // Storing this to avoid calling WorkforceProfiles.renderName(workerProfile.getName()) while logging - private final WorkerProfile workerProfile; + private final String helixParticipantId; + private final String helixTag; private final String startupCommand; - public ContainerInfo(Container container, String workerProfileName, WorkerProfile workerProfile) { + public ContainerInfo(Container container, String helixParticipantId, String helixTag) { this.container = container; - this.workerProfileName = workerProfileName; - this.workerProfile = workerProfile; - this.startupCommand = YarnService.this.buildContainerCommand(container, workerProfileName, workerProfile); + this.helixParticipantId = helixParticipantId; + this.helixTag = helixTag; + this.startupCommand = YarnService.this.buildContainerCommand(container, helixParticipantId, helixTag); } @Override public String toString() { - return String.format("ContainerInfo{ container=%s, workerProfileName=%s, startupCommand=%s }", - container.getId(), workerProfileName, startupCommand); + return String.format("ContainerInfo{ container=%s, helixParticipantId=%s, helixTag=%s, startupCommand=%s }", + container.getId(), helixParticipantId, helixTag, startupCommand); } } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java index 8b6d0bf270..6bdfe46276 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java @@ -70,18 +70,6 @@ public class DummyScalingDirectiveSource implements ScalingDirectiveSource { new ScalingDirective("firstProfile", 5, currentTime), new ScalingDirective("secondProfile", 3, currentTime + 1) ); - } else if (currNumInvocations == 3) { - // changing set point to 0 for both profiles so that all containers should be released - return Arrays.asList( - new ScalingDirective("firstProfile", 0, currentTime), - new ScalingDirective("secondProfile", 0, currentTime + 1) - ); - } else if (currNumInvocations == 4) { - // increasing containers count for both profiles so that new containers should be launched - return Arrays.asList( - new ScalingDirective("firstProfile", 5, currentTime), - new ScalingDirective("secondProfile", 5, currentTime + 1) - ); } return new ArrayList<>(); } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java index 666e3c54c4..c43a27fa76 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -17,7 +17,6 @@ package org.apache.gobblin.temporal.yarn; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -65,33 +64,19 @@ public class DynamicScalingYarnServiceManagerTest { Thread.sleep(3000); testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers(); - } - - @Test - public void testWhenScalingDirectivesThrowsFNFE() throws IOException, InterruptedException { - Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenThrow(FileNotFoundException.class); - TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( - mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); - testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(2000); - testDynamicScalingYarnServiceManager.shutDown(); - Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers(); } /** Note : this test uses {@link DummyScalingDirectiveSource}*/ @Test public void testWithDummyScalingDirectiveSource() throws IOException, InterruptedException { - // DummyScalingDirectiveSource returns 2 scaling directives in first 5 invocations and after that it returns empty list - // so the total number of invocations after five invocations should always be 5 + // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 3 TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); testDynamicScalingYarnServiceManager.startUp(); - Thread.sleep(7000); // 7 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 7 times + Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times testDynamicScalingYarnServiceManager.shutDown(); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).calcDeltasAndRequestContainers(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); } @Test @@ -110,7 +95,6 @@ public class DynamicScalingYarnServiceManagerTest { Thread.sleep(5000); testDynamicScalingYarnServiceManager.shutDown(); Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).calcDeltasAndRequestContainers(); } /** Test implementation of {@link AbstractDynamicScalingYarnServiceManager} which returns passed diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java index 9556bccd1b..6c0946aabb 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -17,109 +17,38 @@ package org.apache.gobblin.temporal.yarn; +import java.net.URL; import java.util.Collections; -import java.util.List; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.mockito.ArgumentCaptor; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.testng.Assert; -import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.google.common.base.Optional; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.temporal.dynamic.ScalingDirective; -import org.apache.gobblin.temporal.dynamic.WorkerProfile; import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; -import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyString; - /** Tests for {@link DynamicScalingYarnService} */ public class DynamicScalingYarnServiceTest { private Config defaultConfigs; - private final int initNumContainers = 1; - private final int initMemoryMbs = 1024; - private final int initCores = 1; - private final Resource initResource = Resource.newInstance(initMemoryMbs, initCores); private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); private final EventBus eventBus = new EventBus("TemporalDynamicScalingYarnServiceTest"); - private AMRMClientAsync mockAMRMClient; - private RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse; - private WorkerProfile testBaselineworkerProfile; - private DynamicScalingYarnService dynamicScalingYarnServiceSpy; @BeforeClass - public void setup() throws Exception { - this.defaultConfigs = ConfigFactory.empty() - .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, ConfigValueFactory.fromAnyRef(initCores)) - .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, ConfigValueFactory.fromAnyRef(initMemoryMbs)) - .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, ConfigValueFactory.fromAnyRef(initNumContainers)); - - this.testBaselineworkerProfile = new WorkerProfile(this.defaultConfigs); - - mockAMRMClient = Mockito.mock(AMRMClientAsync.class); - mockRegisterApplicationMasterResponse = Mockito.mock(RegisterApplicationMasterResponse.class); - - MockedStatic<AMRMClientAsync> amrmClientAsyncMockStatic = Mockito.mockStatic(AMRMClientAsync.class); - - amrmClientAsyncMockStatic.when(() -> AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class))) - .thenReturn(mockAMRMClient); - Mockito.doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class)); - - Mockito.when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString())) - .thenReturn(mockRegisterApplicationMasterResponse); - Mockito.when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability()) - .thenReturn(Mockito.mock(Resource.class)); - } - - @BeforeMethod - public void setupMethod() throws Exception { - DynamicScalingYarnService dynamicScalingYarnService = new DynamicScalingYarnService(this.defaultConfigs, "testApp", "testAppId", yarnConfiguration, mockFileSystem, eventBus); - dynamicScalingYarnServiceSpy = Mockito.spy(dynamicScalingYarnService); - Mockito.doNothing().when(dynamicScalingYarnServiceSpy).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); - dynamicScalingYarnServiceSpy.containerMap.clear(); - } - - @AfterMethod - public void cleanupMethod() { - dynamicScalingYarnServiceSpy.containerMap.clear(); - Mockito.reset(dynamicScalingYarnServiceSpy); - } - - @Test - public void testDynamicScalingYarnServiceStartupWithInitialContainers() throws Exception { - dynamicScalingYarnServiceSpy.startUp(); - ArgumentCaptor<Resource> resourceCaptor = ArgumentCaptor.forClass(Resource.class); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(initNumContainers), resourceCaptor.capture(), Mockito.any(Optional.class)); - Resource capturedResource = resourceCaptor.getValue(); - Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); - Assert.assertEquals(capturedResource.getVirtualCores(), initCores); + public void setup() { + URL url = DynamicScalingYarnServiceTest.class.getClassLoader() + .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); // using same initial config as of YarnServiceTest + Assert.assertNotNull(url, "Could not find resource " + url); + this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); } @Test @@ -132,160 +61,4 @@ public class DynamicScalingYarnServiceTest { dynamicScalingYarnServiceSpy.reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(baseScalingDirective)); Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(numNewContainers), Mockito.any(Resource.class), Mockito.any(Optional.class)); } - - @DataProvider(name = "OOMExitStatusProvider") - public Object[][] OOMExitStatusProvider() { - return new Object[][] { - {ContainerExitStatus.KILLED_EXCEEDED_PMEM}, - {ContainerExitStatus.KILLED_EXCEEDED_VMEM}, - {DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE} - }; - } - - @DataProvider(name = "NonOOMExitStatusProviderWhichRequestReplacementContainer") - public Object[][] NonOOMExitStatusProviderWhichRequestReplacementContainer() { - return new Object[][] { - {ContainerExitStatus.ABORTED}, - {ContainerExitStatus.PREEMPTED} - }; - } - - @DataProvider(name = "ExitStatusProviderWhichDoesNotRequestReplacementContainer") - public Object[][] ExitStatusProviderWhichDoesNotRequestReplacementContainer() { - return new Object[][] { - {ContainerExitStatus.SUCCESS}, - {ContainerExitStatus.INVALID}, - {ContainerExitStatus.DISKS_FAILED}, - {ContainerExitStatus.KILLED_BY_APPMASTER}, - {ContainerExitStatus.KILLED_BY_RESOURCEMANAGER}, - {ContainerExitStatus.KILLED_AFTER_APP_COMPLETION}, - {ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER} - }; - } - - @Test(dataProvider = "OOMExitStatusProvider") - public void testHandleContainerCompletionForStatusOOM(int containerExitStatusCode) throws Exception { - ContainerId containerId = generateRandomContainerId(); - DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); - ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); - Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); - Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); - dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); - ArgumentCaptor<Resource> resourceCaptor = ArgumentCaptor.forClass(Resource.class); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); - Resource capturedResource = resourceCaptor.getValue(); - Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs * DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER); - Assert.assertEquals(capturedResource.getVirtualCores(), initCores); - } - - @Test(dataProvider = "NonOOMExitStatusProviderWhichRequestReplacementContainer") - public void testHandleContainerCompletionForNonOOMStatusWhichRequestReplacementContainer(int containerExitStatusCode) throws Exception { - ContainerId containerId = generateRandomContainerId(); - DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); - ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); - Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); - Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); - dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); - ArgumentCaptor<Resource> resourceCaptor = ArgumentCaptor.forClass(Resource.class); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); - Resource capturedResource = resourceCaptor.getValue(); - Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); - Assert.assertEquals(capturedResource.getVirtualCores(), initCores); - } - - @Test - public void testHandleContainerCompletionForAllOOMStatus() throws Exception { - ContainerId containerId1 = generateRandomContainerId(); - ContainerId containerId2 = generateRandomContainerId(); - ContainerId containerId3 = generateRandomContainerId(); - - DynamicScalingYarnService.ContainerInfo containerInfo1 = createBaselineContainerInfo(containerId1); - DynamicScalingYarnService.ContainerInfo containerInfo2 = createBaselineContainerInfo(containerId2); - DynamicScalingYarnService.ContainerInfo containerInfo3 = createBaselineContainerInfo(containerId3); - - ContainerStatus containerStatus1 = Mockito.mock(ContainerStatus.class); - Mockito.when(containerStatus1.getContainerId()).thenReturn(containerId1); - Mockito.when(containerStatus1.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_VMEM); - - ContainerStatus containerStatus2 = Mockito.mock(ContainerStatus.class); - Mockito.when(containerStatus2.getContainerId()).thenReturn(containerId2); - Mockito.when(containerStatus2.getExitStatus()).thenReturn(DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE); - - ContainerStatus containerStatus3 = Mockito.mock(ContainerStatus.class); - Mockito.when(containerStatus3.getContainerId()).thenReturn(containerId3); - Mockito.when(containerStatus3.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_PMEM); - - // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated - dynamicScalingYarnServiceSpy.containerMap.put(containerId1, containerInfo1); - dynamicScalingYarnServiceSpy.containerMap.put(containerId2, containerInfo2); - dynamicScalingYarnServiceSpy.containerMap.put(containerId3, containerInfo3); - - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus1); - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus2); - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus3); - - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(4)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); - ArgumentCaptor<Resource> resourceCaptor = ArgumentCaptor.forClass(Resource.class); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(4)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); - - List<Resource> capturedResources = resourceCaptor.getAllValues(); - Assert.assertEquals(capturedResources.size(), 4); - - Resource capturedResource = capturedResources.get(0); - Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); - Assert.assertEquals(capturedResource.getVirtualCores(), initCores); - - for (int idx = 1 ; idx < 4 ; idx++) { - capturedResource = capturedResources.get(idx); - Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs * DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER); - Assert.assertEquals(capturedResource.getVirtualCores(), initCores); - } - } - - @Test(dataProvider = "ExitStatusProviderWhichDoesNotRequestReplacementContainer") - public void testHandleContainerCompletionForExitStatusWhichDoesNotRequestReplacementContainer(int containerExitStatusCode) throws Exception { - ContainerId containerId = generateRandomContainerId(); - DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); - ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class); - Mockito.when(containerStatus.getContainerId()).thenReturn(containerId); - Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode); - dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); // Required to be done for test otherwise containerMap is always empty since it is updated after containers are allocated - dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(0)).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); - } - - @Test - public void testContainerRequestedWhenCompletionCalledBeforeAllocated() { - ContainerId containerId = generateRandomContainerId(); - DynamicScalingYarnService.ContainerInfo containerInfo = createBaselineContainerInfo(containerId); - dynamicScalingYarnServiceSpy.removedContainerIds.add(containerId); - dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); - dynamicScalingYarnServiceSpy.calcDeltasAndRequestContainers(); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class), Mockito.anyInt()); - ArgumentCaptor<Resource> resourceCaptor = ArgumentCaptor.forClass(Resource.class); - Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), Mockito.any(Optional.class)); - Resource capturedResource = resourceCaptor.getValue(); - Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs); - Assert.assertEquals(capturedResource.getVirtualCores(), initCores); - } - - - private ContainerId generateRandomContainerId() { - return ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), - 0), (long) (Math.random() * 1000)); - } - - private DynamicScalingYarnService.ContainerInfo createBaselineContainerInfo(ContainerId containerId) { - Container container = Container.newInstance(containerId, null, null, initResource, null, null); - return dynamicScalingYarnServiceSpy.new ContainerInfo(container, WorkforceProfiles.BASELINE_NAME_RENDERING, testBaselineworkerProfile); - } } diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java index 8d216450a3..3c81316b85 100644 --- a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -20,13 +20,8 @@ package org.apache.gobblin.temporal.yarn; import java.io.IOException; import java.net.URL; -import org.apache.gobblin.temporal.dynamic.WorkerProfile; -import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -108,6 +103,12 @@ public class YarnServiceTest { .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio)) .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs)); + Resource resource = Resource.newInstance(resourceMemoryMB, 2); + + Container mockContainer = Mockito.mock(Container.class); + Mockito.when(mockContainer.getResource()).thenReturn(resource); + Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L); + YarnService yarnService = new YarnService( config, "testApplicationName", @@ -117,13 +118,9 @@ public class YarnServiceTest { eventBus ); - WorkerProfile workerProfile = new WorkerProfile(config); - ContainerId containerId = ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 0), - 0), 0); - Resource resource = Resource.newInstance(resourceMemoryMB, 2); - Container container = Container.newInstance(containerId, null, null, resource, null, null); - YarnService.ContainerInfo containerInfo = yarnService.new ContainerInfo(container, WorkforceProfiles.BASELINE_NAME_RENDERING, workerProfile); - String command = containerInfo.getStartupCommand(); + yarnService.startUp(); + + String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag"); Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M")); } }
