This is an automated email from the ASF dual-hosted git repository.

abhijain pushed a commit to branch 
revert-4092-virai_add_yarn_container_replacement
in repository https://gitbox.apache.org/repos/asf/gobblin.git

commit d0d6d0dd77da92d9a74da2b6b17d32784fa56abd
Author: abhishekmjain <[email protected]>
AuthorDate: Fri Feb 21 11:20:57 2025 +0530

    Revert "[GOBBLIN-2189] Implement ContainerCompletion callback in 
DynamicScalingYarnService"
---
 .../AbstractDynamicScalingYarnServiceManager.java  |   6 +-
 .../temporal/yarn/DynamicScalingYarnService.java   | 185 +-----------
 .../apache/gobblin/temporal/yarn/YarnService.java  | 315 +++++++++++++++++----
 .../dynamic/DummyScalingDirectiveSource.java       |  12 -
 .../yarn/DynamicScalingYarnServiceManagerTest.java |  24 +-
 .../yarn/DynamicScalingYarnServiceTest.java        | 239 +---------------
 .../gobblin/temporal/yarn/YarnServiceTest.java     |  21 +-
 7 files changed, 285 insertions(+), 517 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
index e49ec66a22..ca6aa72064 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
@@ -110,13 +110,9 @@ public abstract class 
AbstractDynamicScalingYarnServiceManager extends AbstractI
         List<ScalingDirective> scalingDirectives = 
scalingDirectiveSource.getScalingDirectives();
         if (CollectionUtils.isNotEmpty(scalingDirectives)) {
           
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
-        } else {
-          dynamicScalingYarnService.calcDeltasAndRequestContainers();
         }
       } catch (FileNotFoundException fnfe) {
-        // FNFE comes when scaling directives path is not yet created, so we 
should just calc delta & request containers if needed
-        log.debug("Scaling directives file not found(possibly not yet 
created). Falling back to delta calculation. - " + fnfe.getMessage());
-        dynamicScalingYarnService.calcDeltasAndRequestContainers();
+        log.warn("Failed to get scaling directives - " + fnfe.getMessage()); 
// important message, but no need for a stack trace
       } catch (IOException e) {
         log.error("Failed to get scaling directives", e);
       } catch (Throwable t) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
index 0010a45ff8..0720017b85 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
@@ -17,20 +17,10 @@
 
 package org.apache.gobblin.temporal.yarn;
 
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import com.google.common.eventbus.EventBus;
@@ -38,8 +28,6 @@ import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
-import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
 import org.apache.gobblin.temporal.dynamic.ScalingDirective;
 import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
 import org.apache.gobblin.temporal.dynamic.WorkerProfile;
@@ -54,19 +42,11 @@ import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
  */
 @Slf4j
 public class DynamicScalingYarnService extends YarnService {
-  private static final String 
DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX = 
"replacementWorkerProfile";
-  private static final int LAUNCH_CONTAINER_FAILED_EXIT_CODE = 1;
-  protected static final int GENERAL_OOM_EXIT_STATUS_CODE = 137;
-  protected static final int DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER = 
2;
-  private static final int MAX_REPLACEMENT_CONTAINER_MEMORY_MBS = 65536; // 
64GB
-  private static final int EPSILON_MIILIS = 1;
 
   /** this holds the current count of containers already requested for each 
worker profile */
   private final WorkforceStaffing actualWorkforceStaffing;
   /** this holds the current total workforce plan as per latest received 
scaling directives */
   private final WorkforcePlan workforcePlan;
-  protected final Queue<ContainerId> removedContainerIds;
-  private final AtomicLong profileNameSuffixGenerator;
 
   public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
       YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
@@ -74,8 +54,6 @@ public class DynamicScalingYarnService extends YarnService {
 
     this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
     this.workforcePlan = new WorkforcePlan(this.config, 
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
-    this.removedContainerIds = new ConcurrentLinkedQueue<>();
-    this.profileNameSuffixGenerator = new AtomicLong();
   }
 
   @Override
@@ -84,78 +62,6 @@ public class DynamicScalingYarnService extends YarnService {
     requestNewContainersForStaffingDeltas(deltas);
   }
 
-  /**
-   * Handle the completion of a container. A new container will be requested 
to replace the one
-   * that just exited depending on the exit status.
-   * <p>
-   * A container completes in either of the following conditions:
-   * <ol>
-   *   <li> The container gets stopped by the ApplicationMaster. </li>
-   *   <li> Some error happens in the container and caused the container to 
exit </li>
-   *   <li> The container gets preempted by the ResourceManager </li>
-   *   <li> The container gets killed due to some reason, for example, if it 
runs over the allowed amount of virtual or physical memory </li>
-   * </ol>
-   * A replacement container is needed in all except the first case.
-   * </p>
-   */
-  @Override
-  protected void handleContainerCompletion(ContainerStatus containerStatus) {
-    ContainerId completedContainerId = containerStatus.getContainerId();
-    ContainerInfo completedContainerInfo = 
this.containerMap.remove(completedContainerId);
-
-    // Because callbacks are processed asynchronously, we might encounter 
situations where handleContainerCompletion()
-    // is called before onContainersAllocated(), resulting in the containerId 
missing from the containersMap.
-    // We use removedContainerIds to remember these containers and remove them 
from containerMap later
-    // when we call reviseWorkforcePlanAndRequestNewContainers method
-    if (completedContainerInfo == null) {
-      log.warn("Container {} not found in containerMap. This container 
onContainersCompleted() likely called before onContainersAllocated()",
-          completedContainerId);
-      this.removedContainerIds.add(completedContainerId);
-      return;
-    }
-
-    log.info("Container {} running profile {} completed with exit status {}",
-        completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getExitStatus());
-
-    if (StringUtils.isNotBlank(containerStatus.getDiagnostics())) {
-      log.info("Container {} running profile {} completed with diagnostics: 
{}",
-          completedContainerId, completedContainerInfo.getWorkerProfileName(), 
containerStatus.getDiagnostics());
-    }
-
-    if (this.shutdownInProgress) {
-      log.info("Ignoring container completion for container {} as shutdown is 
in progress", completedContainerId);
-      return;
-    }
-
-    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
-
-    switch (containerStatus.getExitStatus()) {
-      case(ContainerExitStatus.ABORTED):
-        handleAbortedContainer(completedContainerId, completedContainerInfo);
-        break;
-      case(ContainerExitStatus.PREEMPTED):
-        log.info("Container {} for profile {} preempted, starting to launching 
a replacement container",
-            completedContainerId, 
completedContainerInfo.getWorkerProfileName());
-        requestContainersForWorkerProfile(workerProfile, 1);
-        break;
-      case(GENERAL_OOM_EXIT_STATUS_CODE):
-      case(ContainerExitStatus.KILLED_EXCEEDED_VMEM):
-      case(ContainerExitStatus.KILLED_EXCEEDED_PMEM):
-        handleContainerExitedWithOOM(completedContainerId, 
completedContainerInfo);
-        break;
-      case(LAUNCH_CONTAINER_FAILED_EXIT_CODE):
-        log.info("Exit status 1.CompletedContainerInfo = {}", 
completedContainerInfo);
-        break;
-      case ContainerExitStatus.KILLED_AFTER_APP_COMPLETION:
-      case ContainerExitStatus.SUCCESS:
-        break;
-      default:
-        log.warn("Container {} exited with unhandled status code {}. 
ContainerInfo: {}",
-            completedContainerId, containerStatus.getExitStatus(), 
completedContainerInfo);
-        break;
-    }
-  }
-
   /**
    * Revises the workforce plan and requests new containers based on the given 
scaling directives.
    *
@@ -166,101 +72,28 @@ public class DynamicScalingYarnService extends 
YarnService {
       return;
     }
     this.workforcePlan.reviseWhenNewer(scalingDirectives);
-    calcDeltasAndRequestContainers();
-  }
-
-  public synchronized void calcDeltasAndRequestContainers() {
-    // Correct the actualWorkforceStaffing in case of 
handleContainerCompletion() getting called before onContainersAllocated()
-    Iterator<ContainerId> iterator = removedContainerIds.iterator();
-    while (iterator.hasNext()) {
-      ContainerId containerId = iterator.next();
-      ContainerInfo containerInfo = this.containerMap.remove(containerId);
-      if (containerInfo != null) {
-        WorkerProfile workerProfile = containerInfo.getWorkerProfile();
-        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
-        if (currNumContainers > 0) {
-          this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1,
-              System.currentTimeMillis());
-        }
-        iterator.remove();
-      }
-    }
     StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
     requestNewContainersForStaffingDeltas(deltas);
   }
 
   private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
     deltas.getPerProfileDeltas().forEach(profileDelta -> {
-      WorkerProfile workerProfile = profileDelta.getProfile();
-      String profileName = workerProfile.getName();
-      int delta = profileDelta.getDelta();
-      int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
-      if (delta > 0) { // scale up!
+      if (profileDelta.getDelta() > 0) { // scale up!
+        WorkerProfile workerProfile = profileDelta.getProfile();
+        String profileName = workerProfile.getName();
+        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+        int delta = profileDelta.getDelta();
         log.info("Requesting {} new containers for profile {} having currently 
{} containers", delta,
             WorkforceProfiles.renderName(profileName), currNumContainers);
         requestContainersForWorkerProfile(workerProfile, delta);
         // update our staffing after requesting new containers
         this.actualWorkforceStaffing.reviseStaffing(profileName, 
currNumContainers + delta, System.currentTimeMillis());
-      } else if (delta < 0) { // scale down!
-        log.info("Releasing {} containers for profile {} having currently {} 
containers", -delta,
-            WorkforceProfiles.renderName(profileName), currNumContainers);
-        releaseContainersForWorkerProfile(profileName, delta);
-        // update our staffing after releasing containers
-        int numContainersAfterRelease = Math.max(currNumContainers + delta, 0);
-        this.actualWorkforceStaffing.reviseStaffing(profileName, 
numContainersAfterRelease, System.currentTimeMillis());
+      } else if (profileDelta.getDelta() < 0) { // scale down!
+        // TODO: Decide how to handle negative deltas
+        log.warn("Handling of Negative delta is not supported yet : Profile {} 
delta {} ",
+            profileDelta.getProfile().getName(), profileDelta.getDelta());
       } // else, already at staffing plan (or at least have requested, so 
in-progress)
     });
   }
 
-  private void handleAbortedContainer(ContainerId completedContainerId, 
ContainerInfo completedContainerInfo) {
-    // Case 1 : Container release requested while scaling down
-    if (this.releasedContainerCache.getIfPresent(completedContainerId) != 
null) {
-      log.info("Container {} was released while downscaling for profile {}", 
completedContainerId, completedContainerInfo.getWorkerProfileName());
-      this.releasedContainerCache.invalidate(completedContainerId);
-      return;
-    }
-
-    // Case 2 : Container release was not requested, we need to request a 
replacement container
-    log.info("Container {} aborted for profile {}, starting to launch a 
replacement container", completedContainerId, 
completedContainerInfo.getWorkerProfileName());
-    
requestContainersForWorkerProfile(completedContainerInfo.getWorkerProfile(), 1);
-  }
-
-  private synchronized void handleContainerExitedWithOOM(ContainerId 
completedContainerId, ContainerInfo completedContainerInfo) {
-    log.info("Container {} for profile {} exited with OOM, starting to launch 
a replacement container",
-        completedContainerId, completedContainerInfo.getWorkerProfileName());
-
-    List<ScalingDirective> scalingDirectives = new ArrayList<>();
-
-    WorkerProfile workerProfile = completedContainerInfo.getWorkerProfile();
-    long currTimeMillis = System.currentTimeMillis();
-    // Update the current staffing to reflect the container that exited with 
OOM
-    int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(workerProfile.getName()).orElse(0);
-    if (currNumContainers > 0) {
-      this.actualWorkforceStaffing.reviseStaffing(workerProfile.getName(), 
currNumContainers - 1, currTimeMillis);
-      // Add a scaling directive so that workforcePlan have uptodate setPoints 
for the workerProfile,
-      // otherwise extra containers will be requested when calculating deltas
-      scalingDirectives.add(new ScalingDirective(workerProfile.getName(), 
currNumContainers - 1, currTimeMillis));
-    }
-
-    // Request a replacement container
-    int currContainerMemoryMbs = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
-    if (currContainerMemoryMbs >= MAX_REPLACEMENT_CONTAINER_MEMORY_MBS) {
-      log.warn("Container {} already had max allowed memory {} MBs. Not 
requesting a replacement container.",
-          completedContainerId, currContainerMemoryMbs);
-      return;
-    }
-    int newContainerMemoryMbs = Math.min(currContainerMemoryMbs * 
DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER,
-        MAX_REPLACEMENT_CONTAINER_MEMORY_MBS);
-    Optional<ProfileDerivation> optProfileDerivation = Optional.of(new 
ProfileDerivation(workerProfile.getName(),
-        new ProfileOverlay.Adding(new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
newContainerMemoryMbs + ""))
-    ));
-    scalingDirectives.add(new ScalingDirective(
-        DEFAULT_REPLACEMENT_CONTAINER_WORKER_PROFILE_NAME_PREFIX + "-" + 
profileNameSuffixGenerator.getAndIncrement(),
-        1,
-        currTimeMillis + EPSILON_MIILIS, // Each scaling directive should have 
a newer timestamp than the previous one
-        optProfileDerivation
-    ));
-    reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
-  }
-
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index 2818982bab..ec4da215a6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -80,6 +81,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
@@ -104,8 +106,10 @@ import 
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 import org.apache.gobblin.yarn.GobblinYarnEventConstants;
 import org.apache.gobblin.yarn.GobblinYarnMetricTagNames;
 import org.apache.gobblin.yarn.YarnHelixUtils;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
+import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
+import org.apache.gobblin.yarn.event.NewContainerRequest;
 import org.apache.gobblin.temporal.dynamic.WorkerProfile;
-import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
 
 /**
  * This class is responsible for all Yarn-related stuffs including 
ApplicationMaster registration,
@@ -120,9 +124,13 @@ class YarnService extends AbstractIdleService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(YarnService.class);
 
+  private static final String UNKNOWN_HELIX_INSTANCE = "UNKNOWN";
+
   private final String applicationName;
   private final String applicationId;
   private final String appViewAcl;
+  //Default helix instance tag derived from cluster level config
+  private final String helixInstanceTags;
   protected final Config config;
 
   private final EventBus eventBus;
@@ -133,10 +141,16 @@ class YarnService extends AbstractIdleService {
   private final Optional<GobblinMetrics> gobblinMetrics;
   private final Optional<EventSubmitter> eventSubmitter;
 
+  @VisibleForTesting
   @Getter(AccessLevel.PROTECTED)
   private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
   private final NMClientAsync nmClientAsync;
   private final ExecutorService containerLaunchExecutor;
+  private final int requestedContainerMemoryMbs;
+  private final int requestedContainerCores;
+  private final boolean containerHostAffinityEnabled;
+
+  private final int helixInstanceMaxRetries;
   private final String containerTimezone;
   private final String proxyJvmArgs;
 
@@ -150,18 +164,34 @@ class YarnService extends AbstractIdleService {
 
   private final Object allContainersStopped = new Object();
 
-  // A map from container IDs to Container instances, WorkerProfile Name and 
WorkerProfile Object
-  protected final ConcurrentMap<ContainerId, ContainerInfo> containerMap = new 
ConcurrentHashMap<>();
+  // A map from container IDs to Container instances, Helix participant IDs of 
the containers and Helix Tag
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
+  private final ConcurrentMap<ContainerId, ContainerInfo> containerMap = 
Maps.newConcurrentMap();
 
   // A cache of the containers with an outstanding container release request.
   // This is a cache instead of a set to get the automatic cleanup in case a 
container completes before the requested
   // release.
-  protected final Cache<ContainerId, String> releasedContainerCache;
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
+  private final Cache<ContainerId, String> releasedContainerCache;
+
+  // A map from Helix instance names to the number times the instances are 
retried to be started
+  private final ConcurrentMap<String, AtomicInteger> helixInstanceRetryCount = 
Maps.newConcurrentMap();
+
+  // A concurrent HashSet of unused Helix instance names. An unused Helix 
instance name gets put
+  // into the set if the container running the instance completes. Unused Helix
+  // instance names get picked up when replacement containers get allocated.
+  private final Set<String> unusedHelixInstanceNames = 
ConcurrentHashMap.newKeySet();
+
+  // The map from helix tag to allocated container count
+  private final ConcurrentMap<String, AtomicInteger> 
allocatedContainerCountMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<ContainerId, String> removedContainerID = 
Maps.newConcurrentMap();
 
   private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
   private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
 
-  protected volatile boolean shutdownInProgress = false;
+  private volatile boolean shutdownInProgress = false;
 
   private final boolean jarCacheEnabled;
   private static final long DEFAULT_ALLOCATION_REQUEST_ID = 0L;
@@ -196,6 +226,14 @@ class YarnService extends AbstractIdleService {
     this.nmClientAsync = 
closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
     this.nmClientAsync.init(this.yarnConfiguration);
 
+    this.requestedContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+    this.requestedContainerCores = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
+    this.containerHostAffinityEnabled = 
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
+
+    this.helixInstanceMaxRetries = 
config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
+    this.helixInstanceTags = ConfigUtils.getString(config,
+        GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+
     this.proxyJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
         
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) 
: StringUtils.EMPTY;
 
@@ -219,10 +257,53 @@ class YarnService extends AbstractIdleService {
 
   }
 
+  @SuppressWarnings("unused")
+  @Subscribe
+  public void handleNewContainerRequest(NewContainerRequest 
newContainerRequest) {
+    if (!this.maxResourceCapacity.isPresent()) {
+      LOGGER.error(String.format(
+          "Unable to handle new container request as maximum resource capacity 
is not available: "
+              + "[memory (MBs) requested = %d, vcores requested = %d]", 
this.requestedContainerMemoryMbs,
+          this.requestedContainerCores));
+      return;
+    }
+    
requestContainer(newContainerRequest.getReplacedContainer().transform(container 
-> container.getNodeId().getHost()),
+        newContainerRequest.getResource());
+  }
+
   protected NMClientCallbackHandler getNMClientCallbackHandler() {
     return new NMClientCallbackHandler();
   }
 
+  @SuppressWarnings("unused")
+  @Subscribe
+  public void handleContainerShutdownRequest(ContainerShutdownRequest 
containerShutdownRequest) {
+    for (Container container : containerShutdownRequest.getContainers()) {
+      LOGGER.info(String.format("Stopping container %s running on %s", 
container.getId(), container.getNodeId()));
+      this.nmClientAsync.stopContainerAsync(container.getId(), 
container.getNodeId());
+    }
+  }
+
+  /**
+   * Request the Resource Manager to release the container
+   * @param containerReleaseRequest containers to release
+   */
+  @Subscribe
+  public void handleContainerReleaseRequest(ContainerReleaseRequest 
containerReleaseRequest) {
+    for (Container container : containerReleaseRequest.getContainers()) {
+      LOGGER.info(String.format("Releasing container %s running on %s", 
container.getId(), container.getNodeId()));
+
+      // Record that this container was explicitly released so that a new one 
is not spawned to replace it
+      // Put the container id in the releasedContainerCache before releasing 
it so that handleContainerCompletion()
+      // can check for the container id and skip spawning a replacement 
container.
+      // Note that this is the best effort since these are asynchronous 
operations and a container may abort concurrently
+      // with the release call. So in some cases a replacement container may 
have already been spawned before
+      // the container is put into the black list.
+      this.releasedContainerCache.put(container.getId(), "");
+      this.amrmClientAsync.releaseAssignedContainer(container.getId());
+    }
+  }
+
   @Override
   protected synchronized void startUp() throws Exception {
     LOGGER.info("Starting the TemporalYarnService");
@@ -254,8 +335,8 @@ class YarnService extends AbstractIdleService {
 
       // Stop the running containers
       for (ContainerInfo containerInfo : this.containerMap.values()) {
-        LOGGER.info("Stopping container {} running worker profile {}", 
containerInfo.getContainer().getId(),
-            containerInfo.getWorkerProfileName());
+        LOGGER.info("Stopping container {} running participant {}", 
containerInfo.getContainer().getId(),
+            containerInfo.getHelixParticipantId());
         
this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), 
containerInfo.getContainer().getNodeId());
       }
 
@@ -330,27 +411,10 @@ class YarnService extends AbstractIdleService {
     requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, 
containerCores), Optional.of(allocationRequestId));
   }
 
-  protected synchronized void releaseContainersForWorkerProfile(String 
profileName, int numContainers) {
-    int numContainersToRelease = numContainers;
-    Iterator<Map.Entry<ContainerId, ContainerInfo>> containerMapIterator = 
this.containerMap.entrySet().iterator();
-    while (containerMapIterator.hasNext() && numContainers > 0) {
-      Map.Entry<ContainerId, ContainerInfo> entry = 
containerMapIterator.next();
-      if (entry.getValue().getWorkerProfile().getName().equals(profileName)) {
-        ContainerId containerId = entry.getKey();
-        LOGGER.info("Releasing container {} running profile {}", containerId, 
WorkforceProfiles.renderName(profileName));
-        // Record that this container was explicitly released so that a new 
one is not spawned to replace it
-        // Put the container id in the releasedContainerCache before releasing 
it so that handleContainerCompletion()
-        // can check for the container id and skip spawning a replacement 
container.
-        // Note that this is the best effort since these are asynchronous 
operations and a container may abort concurrently
-        // with the release call. So in some cases a replacement container may 
have already been spawned before
-        // the container is put into the black list.
-        this.releasedContainerCache.put(containerId, "");
-        this.amrmClientAsync.releaseAssignedContainer(containerId);
-        numContainers--;
-      }
-    }
-    LOGGER.info("Released {} containers out of {} requested for profile {}", 
numContainersToRelease - numContainers,
-        numContainersToRelease, profileName);
+  private void requestContainer(Optional<String> preferredNode, 
Optional<Resource> resourceOptional) {
+    Resource desiredResource = resourceOptional.or(Resource.newInstance(
+        this.requestedContainerMemoryMbs, this.requestedContainerCores));
+    requestContainer(preferredNode, desiredResource, Optional.absent());
   }
 
   /**
@@ -360,7 +424,7 @@ class YarnService extends AbstractIdleService {
    * @param resource
    */
   protected void requestContainers(int numContainers, Resource resource, 
Optional<Long> optAllocationRequestId) {
-    LOGGER.info("Requesting {} containers with resource = {} and allocation 
request id = {}", numContainers, resource, optAllocationRequestId);
+    LOGGER.info("Requesting {} containers with resource={} and allocation 
request id = {}", numContainers, resource, optAllocationRequestId);
     IntStream.range(0, numContainers)
         .forEach(i -> requestContainer(Optional.absent(), resource, 
optAllocationRequestId));
   }
@@ -488,7 +552,17 @@ class YarnService extends AbstractIdleService {
     }
   }
 
-  private String buildContainerCommand(Container container, String 
workerProfileName, WorkerProfile workerProfile) {
+  @VisibleForTesting
+  protected String buildContainerCommand(Container container, String 
helixParticipantId, String helixInstanceTag) {
+    long allocationRequestId = container.getAllocationRequestId();
+    WorkerProfile workerProfile = 
Optional.fromNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId))
+        .or(() -> {
+          LOGGER.warn("No Worker Profile found for {}, so falling back to 
default", allocationRequestId);
+          return 
this.workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID,
 k -> {
+            LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker 
Profile even yet mapped to the default allocation request ID {} - creating one 
now", DEFAULT_ALLOCATION_REQUEST_ID);
+            return new WorkerProfile(this.config);
+          });
+        });
     Config workerProfileConfig = workerProfile.getConfig();
 
     double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig,
@@ -500,13 +574,13 @@ class YarnService extends AbstractIdleService {
         
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
 
     Preconditions.checkArgument(workerJvmMemoryXmxRatio >= 0 && 
workerJvmMemoryXmxRatio <= 1,
-        workerProfileName + " : " + 
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY +
+        workerProfile.getName() + " : " + 
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY +
             " must be between 0 and 1 inclusive");
 
     long containerMemoryMbs = container.getResource().getMemorySize();
 
     Preconditions.checkArgument(workerJvmMemoryOverheadMbs < 
containerMemoryMbs * workerJvmMemoryXmxRatio,
-        workerProfileName + " : " + 
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY +
+        workerProfile.getName() + " : " + 
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY +
             " cannot be more than " + 
GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " +
             GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
 
@@ -528,8 +602,14 @@ class YarnService extends AbstractIdleService {
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
         .append(" ").append(this.applicationName)
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
-        .append(" ").append(this.applicationId);
+        .append(" ").append(this.applicationId)
+        .append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
+        .append(" ").append(helixParticipantId);
 
+    if (!Strings.isNullOrEmpty(helixInstanceTag)) {
+      containerCommand.append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
+          .append(" ").append(helixInstanceTag);
+    }
     return containerCommand.append(" 
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
             
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
         .append(" 
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
@@ -537,11 +617,123 @@ class YarnService extends AbstractIdleService {
   }
 
   /**
-   * Handle the completion of a container.
-   * Just removes the containerId from {@link #containerMap}
+   * Check the exit status of a completed container and see if the replacement 
container
+   * should try to be started on the same node. Some exit status indicates a 
disk or
+   * node failure and in such cases the replacement container should try to be 
started on
+   * a different node.
+   */
+  private boolean shouldStickToTheSameNode(int containerExitStatus) {
+    switch (containerExitStatus) {
+      case ContainerExitStatus.DISKS_FAILED:
+        return false;
+      case ContainerExitStatus.ABORTED:
+        // Mostly likely this exit status is due to node failures because the
+        // application itself will not release containers.
+        return false;
+      default:
+        // Stick to the same node for other cases if host affinity is enabled.
+        return this.containerHostAffinityEnabled;
+    }
+  }
+
+  /**
+   * Handle the completion of a container. A new container will be requested 
to replace the one
+   * that just exited. Depending on the exit status and if container host 
affinity is enabled,
+   * the new container may or may not try to be started on the same node.
+   * <p>
+   * A container completes in either of the following conditions: 1) some 
error happens in the
+   * container and caused the container to exit, 2) the container gets killed 
due to some reason,
+   * for example, if it runs over the allowed amount of virtual or physical 
memory, 3) the gets
+   * preempted by the ResourceManager, or 4) the container gets stopped by the 
ApplicationMaster.
+   * A replacement container is needed in all but the last case.
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
-    this.containerMap.remove(containerStatus.getContainerId());
+    ContainerInfo completedContainerInfo = 
this.containerMap.remove(containerStatus.getContainerId());
+    //Get the Helix instance name for the completed container. Because 
callbacks are processed asynchronously, we might
+    //encounter situations where handleContainerCompletion() is called before 
onContainersAllocated(), resulting in the
+    //containerId missing from the containersMap.
+    // We use removedContainerID to remember these containers and remove them 
from containerMap later when we call requestTargetNumberOfContainers method
+    if (completedContainerInfo == null) {
+      removedContainerID.putIfAbsent(containerStatus.getContainerId(), "");
+    }
+    String completedInstanceName = UNKNOWN_HELIX_INSTANCE;
+
+    String helixTag = completedContainerInfo == null ? helixInstanceTags : 
completedContainerInfo.getHelixTag();
+    if (completedContainerInfo != null) {
+      allocatedContainerCountMap.get(helixTag).decrementAndGet();
+    }
+
+    LOGGER.info(String.format("Container %s running Helix instance %s with tag 
%s has completed with exit status %d",
+        containerStatus.getContainerId(), completedInstanceName, helixTag, 
containerStatus.getExitStatus()));
+
+    if (!Strings.isNullOrEmpty(containerStatus.getDiagnostics())) {
+      LOGGER.info(String.format("Received the following diagnostics 
information for container %s: %s",
+          containerStatus.getContainerId(), containerStatus.getDiagnostics()));
+    }
+
+    switch(containerStatus.getExitStatus()) {
+      case(ContainerExitStatus.ABORTED):
+        if (handleAbortedContainer(containerStatus, completedContainerInfo, 
completedInstanceName)) {
+          return;
+        }
+        break;
+      case(1): // Same as linux exit status 1 Often occurs when 
launch_container.sh failed
+        LOGGER.info("Exit status 1. CompletedContainerInfo={}", 
completedContainerInfo);
+        break;
+      default:
+        break;
+    }
+
+    if (this.shutdownInProgress) {
+      return;
+    }
+    if(completedContainerInfo != null) {
+      this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new 
AtomicInteger(0));
+      int retryCount = 
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
+
+      // Populate event metadata
+      Optional<ImmutableMap.Builder<String, String>> eventMetadataBuilder = 
Optional.absent();
+      if (this.eventSubmitter.isPresent()) {
+        eventMetadataBuilder = 
Optional.of(buildContainerStatusEventMetadata(containerStatus));
+        
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.HELIX_INSTANCE_ID,
 completedInstanceName);
+        
eventMetadataBuilder.get().put(GobblinYarnEventConstants.EventMetadata.CONTAINER_STATUS_RETRY_ATTEMPT,
 retryCount + "");
+      }
+
+      if (this.helixInstanceMaxRetries > 0 && retryCount > 
this.helixInstanceMaxRetries) {
+        if (this.eventSubmitter.isPresent()) {
+          this.eventSubmitter.get()
+              
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, 
eventMetadataBuilder.get().build());
+        }
+
+        LOGGER.warn("Maximum number of retries has been achieved for Helix 
instance " + completedInstanceName);
+        return;
+      }
+
+      // Add the Helix instance name of the completed container to the set of 
unused
+      // instance names so they can be reused by a replacement container.
+      LOGGER.info("Adding instance {} to the pool of unused instances", 
completedInstanceName);
+      this.unusedHelixInstanceNames.add(completedInstanceName);
+
+      /**
+       * NOTE: logic for handling container failure is removed because {@link 
#YarnService} relies on the auto scaling manager
+       * to control the number of containers by polling helix for the current 
number of tasks
+       * Without that integration, that code requests too many containers when 
there are exceptions and overloads yarn
+       */
+    }
+  }
+
+  private boolean handleAbortedContainer(ContainerStatus containerStatus, 
ContainerInfo completedContainerInfo,
+      String completedInstanceName) {
+    if 
(this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != 
null) {
+      LOGGER.info("Container release requested, so not spawning a replacement 
for containerId {}", containerStatus.getContainerId());
+      if (completedContainerInfo != null) {
+        LOGGER.info("Adding instance {} to the pool of unused instances", 
completedInstanceName);
+        this.unusedHelixInstanceNames.add(completedInstanceName);
+      }
+      return true;
+    }
+    LOGGER.info("Container {} aborted due to lost NM", 
containerStatus.getContainerId());
+    return false;
   }
 
   private ImmutableMap.Builder<String, String> 
buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
@@ -590,28 +782,33 @@ class YarnService extends AbstractIdleService {
     @Override
     public void onContainersAllocated(List<Container> containers) {
       for (final Container container : containers) {
-        long allocationRequestId = container.getAllocationRequestId();
-        WorkerProfile workerProfile = 
Optional.fromNullable(workerProfileByAllocationRequestId.get(allocationRequestId))
-            .or(() -> {
-              LOGGER.warn("No Worker Profile found for {}, so falling back to 
default", allocationRequestId);
-              return 
workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID,
 k -> {
-                LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No 
Worker Profile even yet mapped to the default allocation request ID {} - 
creating one now", DEFAULT_ALLOCATION_REQUEST_ID);
-                return new WorkerProfile(config);
-              });
-            });
-
         String containerId = container.getId().toString();
+        String containerHelixTag = helixInstanceTags;
         if (eventSubmitter.isPresent()) {
           
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
               GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
         }
 
-        LOGGER.info("Container {} has been allocated with resource {} for 
Worker Profile {}",
-            container.getId(), container.getResource(), 
WorkforceProfiles.renderName(workerProfile.getName()));
+        LOGGER.info("Container {} has been allocated with resource {} for 
helix tag {}",
+            container.getId(), container.getResource(), containerHelixTag);
+
+        //Iterate over the (thread-safe) set of unused instances to find the 
first instance that is not currently live.
+        //Once we find a candidate instance, it is removed from the set.
+        String instanceName = null;
+
+        //Ensure that updates to unusedHelixInstanceNames are visible to other 
threads that might concurrently
+        //invoke the callback on container allocation.
+        synchronized (this) {
+          Iterator<String> iterator = unusedHelixInstanceNames.iterator();
+          while (iterator.hasNext()) {
+            instanceName = iterator.next();
+          }
+        }
 
-        ContainerInfo containerInfo = new ContainerInfo(container,
-            WorkforceProfiles.renderName(workerProfile.getName()), 
workerProfile);
+        ContainerInfo containerInfo = new ContainerInfo(container, 
instanceName, containerHelixTag);
         containerMap.put(container.getId(), containerInfo);
+        allocatedContainerCountMap.putIfAbsent(containerHelixTag, new 
AtomicInteger(0));
+        allocatedContainerCountMap.get(containerHelixTag).incrementAndGet();
 
         // Find matching requests and remove the request (YARN-660). We the 
scheduler are responsible
         // for cleaning up requests after allocation based on the design in 
the described ticket.
@@ -772,26 +969,26 @@ class YarnService extends AbstractIdleService {
     }
   }
 
-  // Class encapsulates Container instance, WorkerProfile name to print, 
WorkerProfile, and
+  // Class encapsulates Container instances, Helix participant IDs of the 
containers, Helix Tag, and
   // initial startup command
   @Getter
   class ContainerInfo {
     private final Container container;
-    private final String workerProfileName; // Storing this to avoid calling 
WorkforceProfiles.renderName(workerProfile.getName()) while logging
-    private final WorkerProfile workerProfile;
+    private final String helixParticipantId;
+    private final String helixTag;
     private final String startupCommand;
 
-    public ContainerInfo(Container container, String workerProfileName, 
WorkerProfile workerProfile) {
+    public ContainerInfo(Container container, String helixParticipantId, 
String helixTag) {
       this.container = container;
-      this.workerProfileName = workerProfileName;
-      this.workerProfile = workerProfile;
-      this.startupCommand = YarnService.this.buildContainerCommand(container, 
workerProfileName, workerProfile);
+      this.helixParticipantId = helixParticipantId;
+      this.helixTag = helixTag;
+      this.startupCommand = YarnService.this.buildContainerCommand(container, 
helixParticipantId, helixTag);
     }
 
     @Override
     public String toString() {
-      return String.format("ContainerInfo{ container=%s, workerProfileName=%s, 
startupCommand=%s }",
-          container.getId(), workerProfileName, startupCommand);
+      return String.format("ContainerInfo{ container=%s, 
helixParticipantId=%s, helixTag=%s, startupCommand=%s }",
+          container.getId(), helixParticipantId, helixTag, startupCommand);
     }
   }
 }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
index 8b6d0bf270..6bdfe46276 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
@@ -70,18 +70,6 @@ public class DummyScalingDirectiveSource implements 
ScalingDirectiveSource {
           new ScalingDirective("firstProfile", 5, currentTime),
           new ScalingDirective("secondProfile", 3, currentTime + 1)
       );
-    } else if (currNumInvocations == 3) {
-      // changing set point to 0 for both profiles so that all containers 
should be released
-      return Arrays.asList(
-          new ScalingDirective("firstProfile", 0, currentTime),
-          new ScalingDirective("secondProfile", 0, currentTime + 1)
-      );
-    } else if (currNumInvocations == 4) {
-      // increasing containers count for both profiles so that new containers 
should be launched
-      return Arrays.asList(
-          new ScalingDirective("firstProfile", 5, currentTime),
-          new ScalingDirective("secondProfile", 5, currentTime + 1)
-      );
     }
     return new ArrayList<>();
   }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
index 666e3c54c4..c43a27fa76 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.temporal.yarn;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -65,33 +64,19 @@ public class DynamicScalingYarnServiceManagerTest {
     Thread.sleep(3000);
     testDynamicScalingYarnServiceManager.shutDown();
     Mockito.verify(mockDynamicScalingYarnService, 
Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(3)).calcDeltasAndRequestContainers();
-  }
-
-  @Test
-  public void testWhenScalingDirectivesThrowsFNFE() throws IOException, 
InterruptedException {
-    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenThrow(FileNotFoundException.class);
-    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
-        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
-    testDynamicScalingYarnServiceManager.startUp();
-    Thread.sleep(2000);
-    testDynamicScalingYarnServiceManager.shutDown();
-    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(2)).calcDeltasAndRequestContainers();
   }
 
   /** Note : this test uses {@link DummyScalingDirectiveSource}*/
   @Test
   public void testWithDummyScalingDirectiveSource() throws IOException, 
InterruptedException {
-    // DummyScalingDirectiveSource returns 2 scaling directives in first 5 
invocations and after that it returns empty list
-    // so the total number of invocations after five invocations should always 
be 5
+    // DummyScalingDirectiveSource returns 2 scaling directives in first 3 
invocations and after that it returns empty list
+    // so the total number of invocations after three invocations should 
always be 3
     TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
         mockGobblinTemporalApplicationMaster, new 
DummyScalingDirectiveSource());
     testDynamicScalingYarnServiceManager.startUp();
-    Thread.sleep(7000); // 7 seconds sleep so that 
GetScalingDirectivesRunnable.run() is called for 7 times
+    Thread.sleep(5000); // 5 seconds sleep so that 
GetScalingDirectivesRunnable.run() is called for 5 times
     testDynamicScalingYarnServiceManager.shutDown();
-    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(5)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(2)).calcDeltasAndRequestContainers();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
   }
 
   @Test
@@ -110,7 +95,6 @@ public class DynamicScalingYarnServiceManagerTest {
     Thread.sleep(5000);
     testDynamicScalingYarnServiceManager.shutDown();
     Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(3)).calcDeltasAndRequestContainers();
   }
 
   /** Test implementation of {@link AbstractDynamicScalingYarnServiceManager} 
which returns passed
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
index 9556bccd1b..6c0946aabb 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
@@ -17,109 +17,38 @@
 
 package org.apache.gobblin.temporal.yarn;
 
+import java.net.URL;
 import java.util.Collections;
-import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
-import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.mockito.ArgumentCaptor;
-import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.temporal.dynamic.ScalingDirective;
-import org.apache.gobblin.temporal.dynamic.WorkerProfile;
 import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
-import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
-
 
 /** Tests for {@link DynamicScalingYarnService} */
 public class DynamicScalingYarnServiceTest {
   private Config defaultConfigs;
-  private final int initNumContainers = 1;
-  private final int initMemoryMbs = 1024;
-  private final int initCores = 1;
-  private final Resource initResource = Resource.newInstance(initMemoryMbs, 
initCores);
   private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
   private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
   private final EventBus eventBus = new 
EventBus("TemporalDynamicScalingYarnServiceTest");
-  private AMRMClientAsync mockAMRMClient;
-  private RegisterApplicationMasterResponse 
mockRegisterApplicationMasterResponse;
-  private WorkerProfile testBaselineworkerProfile;
-  private DynamicScalingYarnService dynamicScalingYarnServiceSpy;
 
   @BeforeClass
-  public void setup() throws Exception {
-    this.defaultConfigs = ConfigFactory.empty()
-        .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, 
ConfigValueFactory.fromAnyRef(initCores))
-        .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
ConfigValueFactory.fromAnyRef(initMemoryMbs))
-        .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, 
ConfigValueFactory.fromAnyRef(initNumContainers));
-
-    this.testBaselineworkerProfile = new WorkerProfile(this.defaultConfigs);
-
-    mockAMRMClient = Mockito.mock(AMRMClientAsync.class);
-    mockRegisterApplicationMasterResponse = 
Mockito.mock(RegisterApplicationMasterResponse.class);
-
-    MockedStatic<AMRMClientAsync> amrmClientAsyncMockStatic = 
Mockito.mockStatic(AMRMClientAsync.class);
-
-    amrmClientAsyncMockStatic.when(() -> 
AMRMClientAsync.createAMRMClientAsync(anyInt(), 
any(AMRMClientAsync.CallbackHandler.class)))
-        .thenReturn(mockAMRMClient);
-    
Mockito.doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
-
-    Mockito.when(mockAMRMClient.registerApplicationMaster(anyString(), 
anyInt(), anyString()))
-        .thenReturn(mockRegisterApplicationMasterResponse);
-    
Mockito.when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
-        .thenReturn(Mockito.mock(Resource.class));
-  }
-
-  @BeforeMethod
-  public void setupMethod() throws Exception {
-    DynamicScalingYarnService dynamicScalingYarnService = new 
DynamicScalingYarnService(this.defaultConfigs, "testApp", "testAppId", 
yarnConfiguration, mockFileSystem, eventBus);
-    dynamicScalingYarnServiceSpy = Mockito.spy(dynamicScalingYarnService);
-    
Mockito.doNothing().when(dynamicScalingYarnServiceSpy).requestContainers(Mockito.anyInt(),
 Mockito.any(Resource.class), Mockito.any(Optional.class));
-    dynamicScalingYarnServiceSpy.containerMap.clear();
-  }
-
-  @AfterMethod
-  public void cleanupMethod() {
-    dynamicScalingYarnServiceSpy.containerMap.clear();
-    Mockito.reset(dynamicScalingYarnServiceSpy);
-  }
-
-  @Test
-  public void testDynamicScalingYarnServiceStartupWithInitialContainers() 
throws Exception {
-    dynamicScalingYarnServiceSpy.startUp();
-    ArgumentCaptor<Resource> resourceCaptor = 
ArgumentCaptor.forClass(Resource.class);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class),
 Mockito.anyInt());
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainers(Mockito.eq(initNumContainers), 
resourceCaptor.capture(), Mockito.any(Optional.class));
-    Resource capturedResource = resourceCaptor.getValue();
-    Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs);
-    Assert.assertEquals(capturedResource.getVirtualCores(), initCores);
+  public void setup() {
+    URL url = DynamicScalingYarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); // 
using same initial config as of YarnServiceTest
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
   }
 
   @Test
@@ -132,160 +61,4 @@ public class DynamicScalingYarnServiceTest {
     
dynamicScalingYarnServiceSpy.reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(baseScalingDirective));
     Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainers(Mockito.eq(numNewContainers), 
Mockito.any(Resource.class), Mockito.any(Optional.class));
   }
-
-  @DataProvider(name = "OOMExitStatusProvider")
-  public Object[][] OOMExitStatusProvider() {
-    return new Object[][] {
-        {ContainerExitStatus.KILLED_EXCEEDED_PMEM},
-        {ContainerExitStatus.KILLED_EXCEEDED_VMEM},
-        {DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE}
-    };
-  }
-
-  @DataProvider(name = 
"NonOOMExitStatusProviderWhichRequestReplacementContainer")
-  public Object[][] NonOOMExitStatusProviderWhichRequestReplacementContainer() 
{
-    return new Object[][] {
-        {ContainerExitStatus.ABORTED},
-        {ContainerExitStatus.PREEMPTED}
-    };
-  }
-
-  @DataProvider(name = 
"ExitStatusProviderWhichDoesNotRequestReplacementContainer")
-  public Object[][] 
ExitStatusProviderWhichDoesNotRequestReplacementContainer() {
-    return new Object[][] {
-        {ContainerExitStatus.SUCCESS},
-        {ContainerExitStatus.INVALID},
-        {ContainerExitStatus.DISKS_FAILED},
-        {ContainerExitStatus.KILLED_BY_APPMASTER},
-        {ContainerExitStatus.KILLED_BY_RESOURCEMANAGER},
-        {ContainerExitStatus.KILLED_AFTER_APP_COMPLETION},
-        {ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER}
-    };
-  }
-
-  @Test(dataProvider = "OOMExitStatusProvider")
-  public void testHandleContainerCompletionForStatusOOM(int 
containerExitStatusCode) throws Exception {
-    ContainerId containerId = generateRandomContainerId();
-    DynamicScalingYarnService.ContainerInfo containerInfo = 
createBaselineContainerInfo(containerId);
-    ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class);
-    Mockito.when(containerStatus.getContainerId()).thenReturn(containerId);
-    
Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode);
-    dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); 
// Required to be done for test otherwise containerMap is always empty since it 
is updated after containers are allocated
-    dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(2)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class),
 Mockito.anyInt());
-    ArgumentCaptor<Resource> resourceCaptor = 
ArgumentCaptor.forClass(Resource.class);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(2)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), 
Mockito.any(Optional.class));
-    Resource capturedResource = resourceCaptor.getValue();
-    Assert.assertEquals(capturedResource.getMemorySize(), (long) initMemoryMbs 
* DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER);
-    Assert.assertEquals(capturedResource.getVirtualCores(), initCores);
-  }
-
-  @Test(dataProvider = 
"NonOOMExitStatusProviderWhichRequestReplacementContainer")
-  public void 
testHandleContainerCompletionForNonOOMStatusWhichRequestReplacementContainer(int
 containerExitStatusCode) throws Exception {
-    ContainerId containerId = generateRandomContainerId();
-    DynamicScalingYarnService.ContainerInfo containerInfo = 
createBaselineContainerInfo(containerId);
-    ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class);
-    Mockito.when(containerStatus.getContainerId()).thenReturn(containerId);
-    
Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode);
-    dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); 
// Required to be done for test otherwise containerMap is always empty since it 
is updated after containers are allocated
-    dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class),
 Mockito.anyInt());
-    ArgumentCaptor<Resource> resourceCaptor = 
ArgumentCaptor.forClass(Resource.class);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), 
Mockito.any(Optional.class));
-    Resource capturedResource = resourceCaptor.getValue();
-    Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs);
-    Assert.assertEquals(capturedResource.getVirtualCores(), initCores);
-  }
-
-  @Test
-  public void testHandleContainerCompletionForAllOOMStatus() throws Exception {
-    ContainerId containerId1 = generateRandomContainerId();
-    ContainerId containerId2 = generateRandomContainerId();
-    ContainerId containerId3 = generateRandomContainerId();
-
-    DynamicScalingYarnService.ContainerInfo containerInfo1 = 
createBaselineContainerInfo(containerId1);
-    DynamicScalingYarnService.ContainerInfo containerInfo2 = 
createBaselineContainerInfo(containerId2);
-    DynamicScalingYarnService.ContainerInfo containerInfo3 = 
createBaselineContainerInfo(containerId3);
-
-    ContainerStatus containerStatus1 = Mockito.mock(ContainerStatus.class);
-    Mockito.when(containerStatus1.getContainerId()).thenReturn(containerId1);
-    
Mockito.when(containerStatus1.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_VMEM);
-
-    ContainerStatus containerStatus2 = Mockito.mock(ContainerStatus.class);
-    Mockito.when(containerStatus2.getContainerId()).thenReturn(containerId2);
-    
Mockito.when(containerStatus2.getExitStatus()).thenReturn(DynamicScalingYarnService.GENERAL_OOM_EXIT_STATUS_CODE);
-
-    ContainerStatus containerStatus3 = Mockito.mock(ContainerStatus.class);
-    Mockito.when(containerStatus3.getContainerId()).thenReturn(containerId3);
-    
Mockito.when(containerStatus3.getExitStatus()).thenReturn(ContainerExitStatus.KILLED_EXCEEDED_PMEM);
-
-    // Required to be done for test otherwise containerMap is always empty 
since it is updated after containers are allocated
-    dynamicScalingYarnServiceSpy.containerMap.put(containerId1, 
containerInfo1);
-    dynamicScalingYarnServiceSpy.containerMap.put(containerId2, 
containerInfo2);
-    dynamicScalingYarnServiceSpy.containerMap.put(containerId3, 
containerInfo3);
-
-    dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus1);
-    dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus2);
-    dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus3);
-
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(4)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class),
 Mockito.anyInt());
-    ArgumentCaptor<Resource> resourceCaptor = 
ArgumentCaptor.forClass(Resource.class);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(4)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), 
Mockito.any(Optional.class));
-
-    List<Resource> capturedResources = resourceCaptor.getAllValues();
-    Assert.assertEquals(capturedResources.size(), 4);
-
-    Resource capturedResource = capturedResources.get(0);
-    Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs);
-    Assert.assertEquals(capturedResource.getVirtualCores(), initCores);
-
-    for (int idx = 1 ; idx < 4 ; idx++) {
-      capturedResource = capturedResources.get(idx);
-      Assert.assertEquals(capturedResource.getMemorySize(), (long) 
initMemoryMbs * 
DynamicScalingYarnService.DEFAULT_REPLACEMENT_CONTAINER_MEMORY_MULTIPLIER);
-      Assert.assertEquals(capturedResource.getVirtualCores(), initCores);
-    }
-  }
-
-  @Test(dataProvider = 
"ExitStatusProviderWhichDoesNotRequestReplacementContainer")
-  public void 
testHandleContainerCompletionForExitStatusWhichDoesNotRequestReplacementContainer(int
 containerExitStatusCode) throws Exception {
-    ContainerId containerId = generateRandomContainerId();
-    DynamicScalingYarnService.ContainerInfo containerInfo = 
createBaselineContainerInfo(containerId);
-    ContainerStatus containerStatus = Mockito.mock(ContainerStatus.class);
-    Mockito.when(containerStatus.getContainerId()).thenReturn(containerId);
-    
Mockito.when(containerStatus.getExitStatus()).thenReturn(containerExitStatusCode);
-    dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo); 
// Required to be done for test otherwise containerMap is always empty since it 
is updated after containers are allocated
-    dynamicScalingYarnServiceSpy.handleContainerCompletion(containerStatus);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(0)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class),
 Mockito.anyInt());
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(0)).requestContainers(Mockito.anyInt(), 
Mockito.any(Resource.class), Mockito.any(Optional.class));
-  }
-
-  @Test
-  public void testContainerRequestedWhenCompletionCalledBeforeAllocated() {
-    ContainerId containerId = generateRandomContainerId();
-    DynamicScalingYarnService.ContainerInfo containerInfo = 
createBaselineContainerInfo(containerId);
-    dynamicScalingYarnServiceSpy.removedContainerIds.add(containerId);
-    dynamicScalingYarnServiceSpy.containerMap.put(containerId, containerInfo);
-    dynamicScalingYarnServiceSpy.calcDeltasAndRequestContainers();
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainersForWorkerProfile(Mockito.any(WorkerProfile.class),
 Mockito.anyInt());
-    ArgumentCaptor<Resource> resourceCaptor = 
ArgumentCaptor.forClass(Resource.class);
-    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainers(Mockito.eq(1), resourceCaptor.capture(), 
Mockito.any(Optional.class));
-    Resource capturedResource = resourceCaptor.getValue();
-    Assert.assertEquals(capturedResource.getMemorySize(), initMemoryMbs);
-    Assert.assertEquals(capturedResource.getVirtualCores(), initCores);
-  }
-
-
-  private ContainerId generateRandomContainerId() {
-    return 
ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1,
 0),
-        0), (long) (Math.random() * 1000));
-  }
-
-  private DynamicScalingYarnService.ContainerInfo 
createBaselineContainerInfo(ContainerId containerId) {
-    Container container = Container.newInstance(containerId, null, null, 
initResource, null, null);
-    return dynamicScalingYarnServiceSpy.new ContainerInfo(container, 
WorkforceProfiles.BASELINE_NAME_RENDERING, testBaselineworkerProfile);
-  }
 }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
index 8d216450a3..3c81316b85 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
@@ -20,13 +20,8 @@ package org.apache.gobblin.temporal.yarn;
 import java.io.IOException;
 import java.net.URL;
 
-import org.apache.gobblin.temporal.dynamic.WorkerProfile;
-import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
 import org.apache.hadoop.fs.FileSystem;
 import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -108,6 +103,12 @@ public class YarnServiceTest {
         
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
         
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));
 
+    Resource resource = Resource.newInstance(resourceMemoryMB, 2);
+
+    Container mockContainer = Mockito.mock(Container.class);
+    Mockito.when(mockContainer.getResource()).thenReturn(resource);
+    Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L);
+
     YarnService yarnService = new YarnService(
         config,
         "testApplicationName",
@@ -117,13 +118,9 @@ public class YarnServiceTest {
         eventBus
     );
 
-    WorkerProfile workerProfile = new WorkerProfile(config);
-    ContainerId containerId = 
ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1,
 0),
-        0), 0);
-    Resource resource = Resource.newInstance(resourceMemoryMB, 2);
-    Container container = Container.newInstance(containerId, null, null, 
resource, null, null);
-    YarnService.ContainerInfo containerInfo = yarnService.new 
ContainerInfo(container, WorkforceProfiles.BASELINE_NAME_RENDERING, 
workerProfile);
-    String command = containerInfo.getStartupCommand();
+    yarnService.startUp();
+
+    String command = yarnService.buildContainerCommand(mockContainer, 
"testHelixParticipantId", "testHelixInstanceTag");
     Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M"));
   }
 }

Reply via email to