This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 288b9e271 [GOBBLIN-1646] Revert yarn container / helix tag group 
changes (#3507)
288b9e271 is described below

commit 288b9e271ac561745c3b7e49c381c75ea9f7c84b
Author: Matthew Ho <[email protected]>
AuthorDate: Thu May 12 15:13:06 2022 -0700

    [GOBBLIN-1646] Revert yarn container / helix tag group changes (#3507)
    
    Revert "Fix bug when shrinking the container in Yarn service (#3504)"
    This reverts commit dd6d910a7e7a90d15258c6c77ebe626ae6d573f9.
    
    Revert "[GOBBLIN-1620]Make yarn container allocation group by helix tag 
(#3487)"
    This reverts commit 3e877951c284ccd68be3634522f9fc2c3d39f81a.
---
 .../cluster/GobblinClusterConfigurationKeys.java   |   8 -
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  14 -
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |  25 +-
 .../gobblin/yarn/YarnAutoScalingManager.java       | 120 ++++----
 .../gobblin/yarn/YarnContainerRequestBundle.java   |  76 -----
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  25 --
 .../java/org/apache/gobblin/yarn/YarnService.java  | 189 +++++-------
 .../gobblin/yarn/event/NewContainerRequest.java    |  10 -
 .../apache/gobblin/yarn/GobblinYarnTestUtils.java  |   7 -
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 338 +++++++++------------
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |  51 ++--
 .../yarn/YarnServiceTestWithExpiration.java        |  10 +-
 .../src/test/resources/YarnServiceTest.conf        |   3 +-
 13 files changed, 314 insertions(+), 562 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 3454b4d48..8c9513f50 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -84,7 +84,6 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobTag";
   public static final String HELIX_PLANNING_JOB_TAG_KEY = 
GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
   public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "helixInstanceTags";
-  public static final String HELIX_DEFAULT_TAG = "GobblinHelixDefaultTag";
 
   // Helix job quota
   public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobType";
@@ -185,13 +184,6 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
   public static final boolean 
DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;
 
-  // Config to specify the resource requirement for each Gobblin job run, so 
that helix tasks within this job will
-  // be assigned to containers with desired resource. This config need to 
cooperate with helix job tag, so that helix
-  // cluster knows how to distribute tasks to correct containers.
-  public static final String HELIX_JOB_CONTAINER_MEMORY_MBS = 
GOBBLIN_CLUSTER_PREFIX + "job.container.memory.mbs";
-  public static final String HELIX_JOB_CONTAINER_CORES = 
GOBBLIN_CLUSTER_PREFIX + "job.container.cores";
-
-
 
   //Config to enable/disable reuse of existing Helix Cluster
   public static final String HELIX_CLUSTER_OVERWRITE_KEY = 
GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index e7e305ad5..df15ee385 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -413,20 +413,6 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
             
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
 
-    Map<String, String> jobConfigMap = new HashMap<>();
-    if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS))
 {
-      
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
-          
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
-      log.info("Job {} has specific memory requirement:{}, add this config to 
command config map",
-          this.jobContext.getJobId(), 
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
-    }
-    if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES))
 {
-      
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
-          
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
-      log.info("Job {} has specific Vcore requirement:{}, add this config to 
command config map",
-          this.jobContext.getJobId(), 
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
-    }
-    jobConfigBuilder.setJobCommandConfigMap(jobConfigMap);
     return jobConfigBuilder;
   }
 
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 4878d1031..cad583908 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -24,11 +24,9 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -542,25 +540,16 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
    * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance 
with EXAMPLE_INSTANCE_TAG was found.
    */
   private void addInstanceTags() {
+    List<String> tags = ConfigUtils.getStringList(this.clusterConfig, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
     HelixManager receiverManager = getReceiverManager();
     if (receiverManager.isConnected()) {
-      // The helix instance associated with this container should be 
consistent on helix tag
-      List<String> existedTags = receiverManager.getClusterManagmentTool()
-          .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags();
-      Set<String> desiredTags = new HashSet<>(
-          ConfigUtils.getStringList(this.clusterConfig, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
-      if (!desiredTags.isEmpty()) {
-        // Remove tag assignments for the current Helix instance from a 
previous run
-        for (String tag : existedTags) {
-          if (!desiredTags.contains(tag))
-            
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName, 
this.helixInstanceName, tag);
-          logger.info("Removed unrelated helix tag {} for instance {}", tag, 
this.helixInstanceName);
-        }
-        desiredTags.forEach(desiredTag -> 
receiverManager.getClusterManagmentTool()
-            .addInstanceTag(this.clusterName, this.helixInstanceName, 
desiredTag));
+      if (!tags.isEmpty()) {
+        logger.info("Adding tags binding " + tags);
+        tags.forEach(tag -> receiverManager.getClusterManagmentTool()
+            .addInstanceTag(this.clusterName, this.helixInstanceName, tag));
+        logger.info("Actual tags binding " + 
receiverManager.getClusterManagmentTool()
+            .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags());
       }
-      logger.info("Actual tags binding " + 
receiverManager.getClusterManagmentTool()
-          .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags());
     }
   }
 
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 7c4da8fd8..458527abc 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,13 +17,11 @@
 
 package org.apache.gobblin.yarn;
 
-import com.google.common.base.Strings;
 import java.util.ArrayDeque;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -31,12 +29,9 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
@@ -72,13 +67,14 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   // Only one container will be requested for each N partitions of work
   private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = 
AUTO_SCALING_PREFIX + "partitionsPerContainer";
   private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
+  private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + 
"minContainers";
+  private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
+  private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + 
"maxContainers";
   private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 
AUTO_SCALING_PREFIX + "overProvisionFactor";
   private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 
1.0;
-  // The cluster level default tags for Helix instances
-  private final String defaultHelixInstanceTags;
-  private final int defaultContainerMemoryMbs;
-  private final int defaultContainerCores;
 
+  // A rough value of how much containers should be an intolerable number.
+  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + 
"initialDelay";
   private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
 
@@ -91,6 +87,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final ScheduledExecutorService autoScalingExecutor;
   private final YarnService yarnService;
   private final int partitionsPerContainer;
+  private final int minContainers;
+  private final int maxContainers;
   private final double overProvisionFactor;
   private final SlidingWindowReservoir slidingFixedSizeWindow;
   private static int maxIdleTimeInMinutesBeforeScalingDown = 
DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
@@ -105,20 +103,31 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     Preconditions.checkArgument(this.partitionsPerContainer > 0,
         AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
 
+    this.minContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MIN_CONTAINERS,
+        DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
+    Preconditions.checkArgument(this.minContainers > 0,
+        DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");
+
+    this.maxContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MAX_CONTAINERS,
+        DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
+
     this.overProvisionFactor = ConfigUtils.getDouble(this.config, 
AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
         DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
 
+    Preconditions.checkArgument(this.maxContainers > 0,
+        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
+
+    Preconditions.checkArgument(this.maxContainers >= this.minContainers,
+        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or 
equal to "
+            + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
     this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
-        ? new SlidingWindowReservoir(config.getInt(AUTO_SCALING_WINDOW_SIZE), 
Integer.MAX_VALUE)
-        : new SlidingWindowReservoir(Integer.MAX_VALUE);
+        ? new SlidingWindowReservoir(maxContainers, 
config.getInt(AUTO_SCALING_WINDOW_SIZE))
+        : new SlidingWindowReservoir(maxContainers);
 
     this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("AutoScalingExecutor")));
-
-    this.defaultHelixInstanceTags = ConfigUtils.getString(config,
-        GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
-    this.defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
-    this.defaultContainerCores = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
   }
 
   @Override
@@ -131,10 +140,9 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     log.info("Scheduling the auto scaling task with an interval of {} 
seconds", scheduleInterval);
 
     this.autoScalingExecutor.scheduleAtFixedRate(new 
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
-            this.yarnService, this.partitionsPerContainer, 
this.overProvisionFactor,
-            this.slidingFixedSizeWindow, 
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
-            this.defaultContainerMemoryMbs, this.defaultContainerCores),
-        initialDelay, scheduleInterval, TimeUnit.SECONDS);
+            this.yarnService, this.partitionsPerContainer, this.minContainers, 
this.maxContainers, this.overProvisionFactor,
+            this.slidingFixedSizeWindow, 
this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
+        TimeUnit.SECONDS);
   }
 
   @Override
@@ -154,13 +162,11 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     private final TaskDriver taskDriver;
     private final YarnService yarnService;
     private final int partitionsPerContainer;
+    private final int minContainers;
+    private final int maxContainers;
     private final double overProvisionFactor;
     private final SlidingWindowReservoir slidingWindowReservoir;
     private final HelixDataAccessor helixDataAccessor;
-    private final String defaultHelixInstanceTags;
-    private final int defaultContainerMemoryMbs;
-    private final int defaultContainerCores;
-
     /**
      * A static map that keep track of an idle instance and its latest 
beginning idle time.
      * If an instance is no longer idle when inspected, it will be dropped 
from this map.
@@ -196,7 +202,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     @VisibleForTesting
     void runInternal() {
       Set<String> inUseInstances = new HashSet<>();
-      YarnContainerRequestBundle yarnContainerRequestBundle = new 
YarnContainerRequestBundle();
+
+      int numPartitions = 0;
       for (Map.Entry<String, WorkflowConfig> workFlowEntry : 
taskDriver.getWorkflows().entrySet()) {
         WorkflowContext workflowContext = 
taskDriver.getWorkflowContext(workFlowEntry.getKey());
 
@@ -210,42 +217,24 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
 
         WorkflowConfig workflowConfig = workFlowEntry.getValue();
         JobDag jobDag = workflowConfig.getJobDag();
+
         Set<String> jobs = jobDag.getAllNodes();
 
         // sum up the number of partitions
         for (String jobName : jobs) {
           JobContext jobContext = taskDriver.getJobContext(jobName);
-          JobConfig jobConfig = taskDriver.getJobConfig(jobName);
-          Resource resource = 
Resource.newInstance(this.defaultContainerMemoryMbs, 
this.defaultContainerCores);
-          int numPartitions = 0;
-          String jobTag = defaultHelixInstanceTags;
+
           if (jobContext != null) {
             log.debug("JobContext {} num partitions {}", jobContext, 
jobContext.getPartitionSet().size());
 
             
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
-                .filter(Objects::nonNull).collect(Collectors.toSet()));
-
-            numPartitions = jobContext.getPartitionSet().size();
-            // Job level config for helix instance tags takes precedence over 
other tag configurations
-            if (jobConfig != null) {
-              if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
-                jobTag = jobConfig.getInstanceGroupTag();
-              }
-              Map<String, String> jobCommandConfigMap = 
jobConfig.getJobCommandConfigMap();
-              
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
-                
resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
-              }
-              
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
-                
resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
-              }
-            }
+                .filter(e -> e != null).collect(Collectors.toSet()));
+
+            numPartitions += jobContext.getPartitionSet().size();
           }
-          // compute the container count as a ceiling of number of partitions 
divided by the number of containers
-          // per partition. Scale the result by a constant overprovision 
factor.
-          int containerCount = (int) Math.ceil(((double)numPartitions / 
this.partitionsPerContainer) * this.overProvisionFactor);
-          yarnContainerRequestBundle.add(jobTag, containerCount, resource);
         }
       }
+
       // Find all participants appearing in this cluster. Note that Helix 
instances can contain cluster-manager
       // and potentially replanner-instance.
       Set<String> allParticipants = 
getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
@@ -264,11 +253,17 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
           instanceIdleSince.remove(participant);
         }
       }
-      slidingWindowReservoir.add(yarnContainerRequestBundle);
 
-      log.debug("There are {} containers being requested in total, tag-count 
map {}, tag-resource map {}",
-          yarnContainerRequestBundle.getTotalContainers(), 
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
-          yarnContainerRequestBundle.getHelixTagResourceMap());
+      // compute the target containers as a ceiling of number of partitions 
divided by the number of containers
+      // per partition. Scale the result by a constant overprovision factor.
+      int numTargetContainers = (int) Math.ceil(((double)numPartitions / 
this.partitionsPerContainer) * this.overProvisionFactor);
+
+      // adjust the number of target containers based on the configured min 
and max container values.
+      numTargetContainers = Math.max(this.minContainers, 
Math.min(this.maxContainers, numTargetContainers));
+
+      slidingWindowReservoir.add(numTargetContainers);
+
+      log.info("There are {} containers being requested", numTargetContainers);
 
       
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
 inUseInstances);
     }
@@ -295,8 +290,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
    * which captures max value. It is NOT built for general purpose.
    */
   static class SlidingWindowReservoir {
-    private ArrayDeque<YarnContainerRequestBundle> fifoQueue;
-    private PriorityQueue<YarnContainerRequestBundle> priorityQueue;
+    private ArrayDeque<Integer> fifoQueue;
+    private PriorityQueue<Integer> priorityQueue;
 
     // Queue Size
     private int maxSize;
@@ -311,11 +306,10 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       this.maxSize = maxSize;
       this.upperBound = upperBound;
       this.fifoQueue = new ArrayDeque<>(maxSize);
-      this.priorityQueue = new PriorityQueue<>(maxSize, new 
Comparator<YarnContainerRequestBundle>() {
+      this.priorityQueue = new PriorityQueue<>(maxSize, new 
Comparator<Integer>() {
         @Override
-        public int compare(YarnContainerRequestBundle o1, 
YarnContainerRequestBundle o2) {
-          Integer i2 = o2.getTotalContainers();
-          return i2.compareTo(o1.getTotalContainers());
+        public int compare(Integer o1, Integer o2) {
+          return o2.compareTo(o1);
         }
       });
     }
@@ -329,14 +323,14 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
      * When a new element is larger than upperbound, reject the value since we 
may request too many Yarn containers.
      * When queue is full, evict head of FIFO-queue (In FIFO queue, elements 
are inserted from tail).
      */
-    public void add(YarnContainerRequestBundle e) {
-      if (e.getTotalContainers() > upperBound) {
+    public void add(int e) {
+      if (e > upperBound) {
         log.error(String.format("Request of getting %s containers seems to be 
excessive, rejected", e));
         return;
       }
 
       if (fifoQueue.size() == maxSize) {
-        YarnContainerRequestBundle removedElement = fifoQueue.remove();
+        Integer removedElement = fifoQueue.remove();
         priorityQueue.remove(removedElement);
       }
 
@@ -351,7 +345,7 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     /**
      * If queue is empty, throw {@link IllegalStateException}.
      */
-    public YarnContainerRequestBundle getMax() {
+    public int getMax() {
       if (priorityQueue.size() > 0) {
         return this.priorityQueue.peek();
       } else {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
deleted file mode 100644
index 63e6fcb23..000000000
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.yarn;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-/**
- * The class that represents current Yarn container request that will be used 
by {link @YarnService}.
- * Yarn container allocation should associate with helix tag, as workflows can 
have specific helix tag setup
- * and specific resource requirement.
- */
-@Slf4j
-@Getter
-public class YarnContainerRequestBundle {
-  int totalContainers;
-  private final Map<String, Integer> helixTagContainerCountMap;
-  private final Map<String, Resource> helixTagResourceMap;
-  private final Map<String, Set<String>> resourceHelixTagMap;
-
-  public YarnContainerRequestBundle() {
-    this.totalContainers = 0;
-    this.helixTagContainerCountMap = new HashMap<>();
-    this.helixTagResourceMap = new HashMap<>();
-    this.resourceHelixTagMap = new HashMap<>();
-  }
-
-  public void add(String helixTag, int containerCount, Resource resource) {
-    helixTagContainerCountMap.put(helixTag, 
helixTagContainerCountMap.getOrDefault(helixTag, 0) + containerCount);
-    if(helixTagResourceMap.containsKey(helixTag)) {
-      Resource existedResource = helixTagResourceMap.get(helixTag);
-      Preconditions.checkArgument(resource.getMemory() == 
existedResource.getMemory() &&
-              resource.getVirtualCores() == existedResource.getVirtualCores(),
-          "Helix tag need to have consistent resource requirement. Tag " + 
helixTag
-              + " has existed resource require " + existedResource.toString() 
+ " and different require " + resource.toString());
-    } else {
-      helixTagResourceMap.put(helixTag, resource);
-      Set<String> tagSet = 
resourceHelixTagMap.getOrDefault(resource.toString(), new HashSet<>());
-      tagSet.add(helixTag);
-      resourceHelixTagMap.put(resource.toString(), tagSet);
-    }
-    totalContainers += containerCount;
-  }
-
-  // This method assumes the resource requirement for the helix tag is already 
stored in the map
-  public void add(String helixTag, int containerCount) {
-    if (!helixTagContainerCountMap.containsKey(helixTag) && 
!helixTagResourceMap.containsKey(helixTag)) {
-      log.error("Helix tag {} is not present in the request bundle yet, can't 
process the request to add {} "
-          + "container for it without specifying the resource requirement", 
helixTag, containerCount);
-      return;
-    }
-    helixTagContainerCountMap.put(helixTag, 
helixTagContainerCountMap.get(helixTag) + containerCount);
-    this.totalContainers += containerCount;
-  }
-}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 18068895c..5309a13ac 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -230,28 +229,4 @@ public class YarnHelixUtils {
   public static String getContainerNum(String containerId) {
     return "container-" + containerId.substring(containerId.lastIndexOf("_") + 
1);
   }
-
-  /**
-   * Find the helix tag for the newly allocated container. The tag should 
align with {@link YarnContainerRequestBundle},
-   * so that the correct resource can be allocated to helix workflow that has 
specific resource requirement.
-   * @param container newly allocated container
-   * @param helixTagAllocatedContainerCount current container count for each 
helix tag
-   * @param requestedYarnContainer yarn container request specify the desired 
state
-   * @return helix tag that this container should be assigned with, if null 
means need to use the default
-   */
-  public static String findHelixTagForContainer(Container container,
-      Map<String, Integer> helixTagAllocatedContainerCount, 
YarnContainerRequestBundle requestedYarnContainer) {
-    String foundTag = null;
-    if(requestedYarnContainer != null && 
requestedYarnContainer.getResourceHelixTagMap().containsKey(container.getResource().toString()))
 {
-      for (String tag : 
requestedYarnContainer.getResourceHelixTagMap().get(container.getResource().toString()))
 {
-        int desiredCount = 
requestedYarnContainer.getHelixTagContainerCountMap().get(tag);
-        int allocatedCount = helixTagAllocatedContainerCount.getOrDefault(tag, 
0);
-        foundTag = tag;
-        if(allocatedCount < desiredCount) {
-          return foundTag;
-        }
-      }
-    }
-    return foundTag;
-  }
 }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index feaa564ab..9fadc9485 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.yarn;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -34,7 +35,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import lombok.AllArgsConstructor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -70,6 +70,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -126,7 +127,6 @@ public class YarnService extends AbstractIdleService {
   private final String applicationName;
   private final String applicationId;
   private final String appViewAcl;
-  //Default helix instance tag derived from cluster level config
   private final String helixInstanceTags;
 
   private final Config config;
@@ -158,7 +158,6 @@ public class YarnService extends AbstractIdleService {
   private final String containerTimezone;
   private final HelixManager helixManager;
 
-  @Getter(AccessLevel.PROTECTED)
   private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
 
   // Security tokens for accessing HDFS
@@ -168,10 +167,10 @@ public class YarnService extends AbstractIdleService {
 
   private final Object allContainersStopped = new Object();
 
-  // A map from container IDs to Container instances, Helix participant IDs of 
the containers and Helix Tag
+  // A map from container IDs to pairs of Container instances and Helix 
participant IDs of the containers
   @VisibleForTesting
   @Getter(AccessLevel.PROTECTED)
-  private final ConcurrentMap<ContainerId, ContainerInfo> containerMap = 
Maps.newConcurrentMap();
+  private final ConcurrentMap<ContainerId, Map.Entry<Container, String>> 
containerMap = Maps.newConcurrentMap();
 
   // A cache of the containers with an outstanding container release request.
   // This is a cache instead of a set to get the automatic cleanup in case a 
container completes before the requested
@@ -191,17 +190,17 @@ public class YarnService extends AbstractIdleService {
   // instance names get picked up when replacement containers get allocated.
   private final Set<String> unusedHelixInstanceNames = 
ConcurrentHashMap.newKeySet();
 
-  // The map from helix tag to requested container count
-  private final Map<String, Integer> requestedContainerCountMap = 
Maps.newConcurrentMap();
-  // The map from helix tag to allocated container count
-  private final Map<String, Integer> allocatedContainerCountMap = 
Maps.newConcurrentMap();
-
-  private volatile YarnContainerRequestBundle yarnContainerRequest;
-  private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
-  private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
-
   private volatile boolean shutdownInProgress = false;
 
+  // The number of containers requested based on the desired target number of 
containers. This is used to determine
+  // how may additional containers to request since the the currently 
allocated amount may be less than this amount if we
+  // are waiting for containers to be allocated.
+  // The currently allocated amount may also be higher than this amount if 
YARN returned more than the requested number
+  // of containers.
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
+  private int numRequestedContainers = 0;
+
   public YarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
       FileSystem fs, EventBus eventBus, HelixManager helixManager) throws 
Exception {
     this.applicationName = applicationName;
@@ -237,8 +236,7 @@ public class YarnService extends AbstractIdleService {
     this.containerHostAffinityEnabled = 
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
 
     this.helixInstanceMaxRetries = 
config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
-    this.helixInstanceTags = ConfigUtils.getString(config,
-        GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+    this.helixInstanceTags = ConfigUtils.getString(config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
 
     this.containerJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
         
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
 :
@@ -288,8 +286,14 @@ public class YarnService extends AbstractIdleService {
           this.requestedContainerCores));
       return;
     }
-    
requestContainer(newContainerRequest.getReplacedContainer().transform(container 
-> container.getNodeId().getHost()),
-        newContainerRequest.getResource());
+
+    requestContainer(newContainerRequest.getReplacedContainer().transform(new 
Function<Container, String>() {
+
+      @Override
+      public String apply(Container container) {
+        return container.getNodeId().getHost();
+      }
+    }));
   }
 
   protected NMClientCallbackHandler getNMClientCallbackHandler() {
@@ -355,10 +359,10 @@ public class YarnService extends AbstractIdleService {
       ExecutorsUtils.shutdownExecutorService(this.containerLaunchExecutor, 
Optional.of(LOGGER));
 
       // Stop the running containers
-      for (ContainerInfo containerInfo : this.containerMap.values()) {
-        LOGGER.info("Stopping container {} running participant {}", 
containerInfo.getContainer().getId(),
-            containerInfo.getHelixParticipantId());
-        
this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), 
containerInfo.getContainer().getNodeId());
+      for (Map.Entry<Container, String> entry : this.containerMap.values()) {
+        LOGGER.info(String.format("Stopping container %s running participant 
%s", entry.getKey().getId(),
+            entry.getValue()));
+        this.nmClientAsync.stopContainerAsync(entry.getKey().getId(), 
entry.getKey().getNodeId());
       }
 
       if (!this.containerMap.isEmpty()) {
@@ -427,28 +431,27 @@ public class YarnService extends AbstractIdleService {
    * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
    * adjust the cluster towards the desired number of containers.
    *
-   * @param yarnContainerRequestBundle the desired containers information, 
including numbers, resource and helix tag
+   * @param numTargetContainers the desired number of containers
    * @param inUseInstances  a set of in use instances
    */
-  public synchronized void 
requestTargetNumberOfContainers(YarnContainerRequestBundle 
yarnContainerRequestBundle, Set<String> inUseInstances) {
-    LOGGER.debug("Requesting numTargetContainers {}, in use instances count is 
{}, container map size is {}",
-        yarnContainerRequestBundle.getTotalContainers(), inUseInstances, 
this.containerMap.size());
-    int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
+  public synchronized void requestTargetNumberOfContainers(int 
numTargetContainers, Set<String> inUseInstances) {
+    LOGGER.debug("Requesting numTargetContainers {} current 
numRequestedContainers {} in use instances {} map size {}",
+        numTargetContainers, this.numRequestedContainers, inUseInstances, 
this.containerMap.size());
+
     // YARN can allocate more than the requested number of containers, compute 
additional allocations and deallocations
     // based on the max of the requested and actual allocated counts
     int numAllocatedContainers = this.containerMap.size();
 
+    // The number of allocated containers may be higher than the previously 
requested amount
+    // and there may be outstanding allocation requests, so the max of both 
counts is computed here
+    // and used to decide whether to allocate containers.
+    int numContainers = Math.max(numRequestedContainers, 
numAllocatedContainers);
+
     // Request additional containers if the desired count is higher than the 
max of the current allocation or previously
     // requested amount. Note that there may be in-flight or additional 
allocations after numContainers has been computed
     // so overshooting can occur, but periodic calls to this method will make 
adjustments towards the target.
-    for (Map.Entry<String, Integer> entry : 
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
-      String currentHelixTag = entry.getKey();
-      int desiredContainerCount = entry.getValue();
-      int requestedContainerCount = 
requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
-      for(; requestedContainerCount < desiredContainerCount; 
requestedContainerCount++) {
-        requestContainer(Optional.absent(), 
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
-      }
-      requestedContainerCountMap.put(currentHelixTag, desiredContainerCount);
+    for (int i = numContainers; i < numTargetContainers; i++) {
+      requestContainer(Optional.<String>absent());
     }
 
     // If the total desired is lower than the currently allocated amount then 
release free containers.
@@ -459,13 +462,12 @@ public class YarnService extends AbstractIdleService {
       LOGGER.debug("Shrinking number of containers by {}", 
(numAllocatedContainers - numTargetContainers));
 
       List<Container> containersToRelease = new ArrayList<>();
-      int numToShutdown = numAllocatedContainers - numTargetContainers;
+      int numToShutdown = numContainers - numTargetContainers;
 
       // Look for eligible containers to release. If a container is in use 
then it is not released.
-      for (Map.Entry<ContainerId, ContainerInfo> entry : 
this.containerMap.entrySet()) {
-        ContainerInfo containerInfo = entry.getValue();
-        if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
-          containersToRelease.add(containerInfo.getContainer());
+      for (Map.Entry<ContainerId, Map.Entry<Container, String>> entry : 
this.containerMap.entrySet()) {
+        if (!inUseInstances.contains(entry.getValue().getValue())) {
+          containersToRelease.add(entry.getValue().getKey());
         }
 
         if (containersToRelease.size() == numToShutdown) {
@@ -477,47 +479,32 @@ public class YarnService extends AbstractIdleService {
 
       this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
     }
-    this.yarnContainerRequest = yarnContainerRequestBundle;
-    LOGGER.info("Current tag-container being requested:{}, tag-container 
allocated: {}",
-        this.requestedContainerCountMap, this.allocatedContainerCountMap);
-  }
 
-  // Request initial containers with default resource and helix tag
-  private void requestInitialContainers(int containersRequested) {
-    YarnContainerRequestBundle initialYarnContainerRequest = new 
YarnContainerRequestBundle();
-    Resource capability = 
Resource.newInstance(this.requestedContainerMemoryMbs, 
this.requestedContainerCores);
-    initialYarnContainerRequest.add(this.helixInstanceTags, 
containersRequested, capability);
-    requestTargetNumberOfContainers(initialYarnContainerRequest, 
Collections.EMPTY_SET);
+    this.numRequestedContainers = numTargetContainers;
   }
 
-  private void requestContainer(Optional<String> preferredNode, 
Optional<Resource> resourceOptional) {
-    Resource desiredResource = resourceOptional.or(Resource.newInstance(
-        this.requestedContainerMemoryMbs, this.requestedContainerCores));
-    requestContainer(preferredNode, desiredResource);
+  private void requestInitialContainers(int containersRequested) {
+    requestTargetNumberOfContainers(containersRequested, 
Collections.EMPTY_SET);
   }
 
-  // Request containers with specific resource requirement
-  private void requestContainer(Optional<String> preferredNode, Resource 
resource) {
-    // Fail if Yarn cannot meet container resource requirements
-    Preconditions.checkArgument(resource.getMemory() <= 
this.maxResourceCapacity.get().getMemory() &&
-            resource.getVirtualCores() <= 
this.maxResourceCapacity.get().getVirtualCores(),
-        "Resource requirement must less than the max resource capacity. 
Requested resource" + resource.toString()
-            + " exceed the max resource limit " + 
this.maxResourceCapacity.get().toString());
-
-    // Due to YARN-314, different resource capacity needs different priority, 
otherwise Yarn will not allocate container
+  private void requestContainer(Optional<String> preferredNode) {
     Priority priority = Records.newRecord(Priority.class);
-    if(!resourcePriorityMap.containsKey(resource.toString())) {
-      resourcePriorityMap.put(resource.toString(), 
priorityNumGenerator.getAndIncrement());
-    }
-    int priorityNum = resourcePriorityMap.get(resource.toString());
-    priority.setPriority(priorityNum);
+    priority.setPriority(0);
+
+    Resource capability = Records.newRecord(Resource.class);
+    int maxMemoryCapacity = this.maxResourceCapacity.get().getMemory();
+    capability.setMemory(this.requestedContainerMemoryMbs <= maxMemoryCapacity 
?
+        this.requestedContainerMemoryMbs : maxMemoryCapacity);
+    int maxCoreCapacity = this.maxResourceCapacity.get().getVirtualCores();
+    capability.setVirtualCores(this.requestedContainerCores <= maxCoreCapacity 
?
+        this.requestedContainerCores : maxCoreCapacity);
 
     String[] preferredNodes = preferredNode.isPresent() ? new String[] 
{preferredNode.get()} : null;
     this.amrmClientAsync.addContainerRequest(
-        new AMRMClient.ContainerRequest(resource, preferredNodes, null, 
priority));
+        new AMRMClient.ContainerRequest(capability, preferredNodes, null, 
priority));
   }
 
-  protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
+  protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
       throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
@@ -540,7 +527,7 @@ public class YarnService extends AbstractIdleService {
     ContainerLaunchContext containerLaunchContext = 
Records.newRecord(ContainerLaunchContext.class);
     containerLaunchContext.setLocalResources(resourceMap);
     
containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-    
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
+    
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(container,
 helixInstanceName)));
 
     Map<ApplicationAccessType, String> acls = new HashMap<>(1);
     acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -593,11 +580,11 @@ public class YarnService extends AbstractIdleService {
   }
 
   @VisibleForTesting
-  protected String buildContainerCommand(ContainerInfo containerInfo) {
+  protected String buildContainerCommand(Container container, String 
helixInstanceName) {
     String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
     StringBuilder containerCommand = new StringBuilder()
         
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
-        .append(" -Xmx").append((int) 
(containerInfo.getContainer().getResource().getMemory() * 
this.jvmMemoryXmxRatio) -
+        .append(" -Xmx").append((int) (container.getResource().getMemory() * 
this.jvmMemoryXmxRatio) -
             this.jvmMemoryOverheadMbs).append("M")
         .append(" 
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
@@ -609,11 +596,11 @@ public class YarnService extends AbstractIdleService {
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
         .append(" ").append(this.applicationId)
         .append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
-        .append(" ").append(containerInfo.getHelixParticipantId());
+        .append(" ").append(helixInstanceName);
 
-    if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
+    if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
       containerCommand.append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
-          .append(" ").append(containerInfo.getHelixTag());
+          .append(" ").append(helixInstanceTags);
     }
     return containerCommand.append(" 
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
           containerProcessName).append(".").append(ApplicationConstants.STDOUT)
@@ -653,13 +640,11 @@ public class YarnService extends AbstractIdleService {
    * A replacement container is needed in all but the last case.
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
-    ContainerInfo completedContainerInfo = 
this.containerMap.remove(containerStatus.getContainerId());
+    Map.Entry<Container, String> completedContainerEntry = 
this.containerMap.remove(containerStatus.getContainerId());
     //Get the Helix instance name for the completed container. Because 
callbacks are processed asynchronously, we might
     //encounter situations where handleContainerCompletion() is called before 
onContainersAllocated(), resulting in the
     //containerId missing from the containersMap.
-    String completedInstanceName = completedContainerInfo == null?  
UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
-    String helixTag = completedContainerInfo == null ? helixInstanceTags : 
completedContainerInfo.getHelixTag();
-    allocatedContainerCountMap.put(helixTag, 
allocatedContainerCountMap.get(helixTag) - 1);
+    String completedInstanceName = completedContainerEntry == null?  
UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
 
     LOGGER.info(String.format("Container %s running Helix instance %s has 
completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, 
containerStatus.getExitStatus()));
@@ -672,7 +657,7 @@ public class YarnService extends AbstractIdleService {
     if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
       if 
(this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != 
null) {
         LOGGER.info("Container release requested, so not spawning a 
replacement for containerId {}", containerStatus.getContainerId());
-        if (completedContainerInfo != null) {
+        if (completedContainerEntry != null) {
           LOGGER.info("Adding instance {} to the pool of unused instances", 
completedInstanceName);
           this.unusedHelixInstanceNames.add(completedInstanceName);
         }
@@ -698,7 +683,7 @@ public class YarnService extends AbstractIdleService {
     if (this.shutdownInProgress) {
       return;
     }
-    if(completedContainerInfo != null) {
+    if(completedContainerEntry != null) {
       this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new 
AtomicInteger(0));
       int retryCount = 
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
 
@@ -730,13 +715,10 @@ public class YarnService extends AbstractIdleService {
             
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, 
eventMetadataBuilder.get().build());
       }
     }
-    Optional<Resource> newContainerResource = completedContainerInfo != null ?
-        Optional.of(completedContainerInfo.getContainer().getResource()) : 
Optional.absent();
-    LOGGER.info("Requesting a new container to replace {} to run Helix 
instance {} with helix tag {} and resource {}",
-        containerStatus.getContainerId(), completedInstanceName, helixTag, 
newContainerResource.orNull());
+    LOGGER.info(String.format("Requesting a new container to replace %s to run 
Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
     this.eventBus.post(new NewContainerRequest(
-        shouldStickToTheSameNode(containerStatus.getExitStatus()) && 
completedContainerInfo != null ?
-            Optional.of(completedContainerInfo.getContainer()) : 
Optional.absent(), newContainerResource));
+        shouldStickToTheSameNode(containerStatus.getExitStatus()) && 
completedContainerEntry != null ?
+            Optional.of(completedContainerEntry.getKey()) : 
Optional.<Container>absent()));
   }
 
   private ImmutableMap.Builder<String, String> 
buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
@@ -773,18 +755,12 @@ public class YarnService extends AbstractIdleService {
     @Override
     public void onContainersAllocated(List<Container> containers) {
       for (final Container container : containers) {
-        String containerId = container.getId().toString();
-        String containerHelixTag = 
YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap, 
yarnContainerRequest);
-        if (Strings.isNullOrEmpty(containerHelixTag)) {
-          containerHelixTag = helixInstanceTags;
-        }
         if (eventSubmitter.isPresent()) {
           
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
-              GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
+              GobblinYarnMetricTagNames.CONTAINER_ID, 
container.getId().toString());
         }
 
-        LOGGER.info("Container {} has been allocated with resource {} for 
helix tag {}",
-            container.getId(), container.getResource(), containerHelixTag);
+        LOGGER.info(String.format("Container %s has been allocated", 
container.getId()));
 
         //Iterate over the (thread-safe) set of unused instances to find the 
first instance that is not currently live.
         //Once we find a candidate instance, it is removed from the set.
@@ -805,8 +781,6 @@ public class YarnService extends AbstractIdleService {
               instanceName = null;
             }
           }
-          allocatedContainerCountMap.put(containerHelixTag,
-              allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) + 
1);
         }
 
         if (Strings.isNullOrEmpty(instanceName)) {
@@ -815,8 +789,8 @@ public class YarnService extends AbstractIdleService {
               .getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, 
helixInstanceIdGenerator.incrementAndGet());
         }
 
-        ContainerInfo containerInfo = new ContainerInfo(container, 
instanceName, containerHelixTag);
-        containerMap.put(container.getId(), containerInfo);
+        final String finalInstanceName = instanceName;
+        containerMap.put(container.getId(), new 
AbstractMap.SimpleImmutableEntry<>(container, finalInstanceName));
 
         // Find matching requests and remove the request to reduce the chance 
that a subsequent request
         // will request extra containers. YARN does not have a delta request 
API and the requests are not
@@ -845,11 +819,11 @@ public class YarnService extends AbstractIdleService {
           @Override
           public void run() {
             try {
-              LOGGER.info("Starting container " + containerId);
+              LOGGER.info("Starting container " + container.getId());
 
-              nmClientAsync.startContainerAsync(container, 
newContainerLaunchContext(containerInfo));
+              nmClientAsync.startContainerAsync(container, 
newContainerLaunchContext(container, finalInstanceName));
             } catch (IOException ioe) {
-              LOGGER.error("Failed to start container " + containerId, ioe);
+              LOGGER.error("Failed to start container " + container.getId(), 
ioe);
             }
           }
         });
@@ -965,13 +939,4 @@ public class YarnService extends AbstractIdleService {
       LOGGER.error(String.format("Failed to stop container %s due to error 
%s", containerId, t));
     }
   }
-
-  //A class encapsulates Container instances, Helix participant IDs of the 
containers and Helix Tag
-  @AllArgsConstructor
-  @Getter
-  static class ContainerInfo {
-     private final Container container;
-     private final String helixParticipantId;
-     private final String helixTag;
-  }
 }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
index 5cb529ed0..7ee4b3223 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
@@ -17,11 +17,9 @@
 
 package org.apache.gobblin.yarn.event;
 
-import lombok.Getter;
 import org.apache.hadoop.yarn.api.records.Container;
 
 import com.google.common.base.Optional;
-import org.apache.hadoop.yarn.api.records.Resource;
 
 
 /**
@@ -32,17 +30,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
 public class NewContainerRequest {
 
   private final Optional<Container> replacedContainer;
-  @Getter
-  private final Optional<Resource> resource;
 
   public NewContainerRequest(Optional<Container> replacedContainer) {
     this.replacedContainer = replacedContainer;
-    this.resource = Optional.absent();
-  }
-
-  public NewContainerRequest(Optional<Container> replacedContainer, 
Optional<Resource> resource) {
-    this.replacedContainer = replacedContainer;
-    this.resource = resource;
   }
 
   /**
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
index 21f3df0d5..9bb5b747e 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -76,10 +75,4 @@ public class GobblinYarnTestUtils {
     credentials.addToken(token.getService(), token);
     credentials.writeTokenStorageFile(path, new Configuration());
   }
-
-  public static YarnContainerRequestBundle createYarnContainerRequest(int n, 
Resource resource) {
-    YarnContainerRequestBundle yarnContainerRequestBundle = new 
YarnContainerRequestBundle();
-    yarnContainerRequestBundle.add("GobblinKafkaStreaming", n, resource);
-    return yarnContainerRequestBundle;
-  }
 }
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 003bd9ccb..6c4047147 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -19,22 +19,15 @@ package org.apache.gobblin.yarn;
 
 import java.io.IOException;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -42,7 +35,6 @@ import org.testng.annotations.Test;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
-import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 
@@ -55,9 +47,6 @@ public class YarnAutoScalingManagerTest {
   // A queue within size == 1 and upperBound == "infinite" should not impact 
on the execution.
   private final static YarnAutoScalingManager.SlidingWindowReservoir noopQueue 
=
       new YarnAutoScalingManager.SlidingWindowReservoir(1, Integer.MAX_VALUE);
-  private final static int defaultContainerMemory = 1024;
-  private final static int defaultContainerCores = 2;
-  private final static String defaultHelixTag = "DefaultHelixTag";
   /**
    * Test for one workflow with one job
    */
@@ -93,15 +82,13 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+            1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+
     // 2 containers requested and one worker in use
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+        requestTargetNumberOfContainers(2, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   /**
@@ -144,17 +131,14 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(3, 
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
   }
 
   /**
@@ -216,17 +200,14 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-3", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 5 containers requested and 3 workers in use
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(5,
+        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", 
"GobblinYarnTaskRunner-3"));
   }
 
   /**
@@ -288,17 +269,14 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(3,
+        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
   }
 
   /**
@@ -335,17 +313,103 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 2,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            2, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 1 container requested since 2 partitions and limit is 2 partitions per 
container. One worker in use.
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+  }
+
+
+  /**
+   * Test min containers
+   */
+  @Test
+  public void testMinContainers() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 5, 10, 1.0, noopQueue, helixDataAccessor);
+
+    runnable.run();
+
+    // 5 containers requested due to min and one worker in use
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(5, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+  }
+
+  /**
+   * Test max containers
+   */
+  @Test
+  public void testMaxContainers() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 1, 1, 1.0, noopQueue, helixDataAccessor);
+
+    runnable.run();
+
+    // 1 container requested to max and one worker in use
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   @Test
@@ -379,49 +443,41 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.2, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 1, 10, 1.2, noopQueue, helixDataAccessor);
 
     runnable1.run();
 
     // 3 containers requested to max and one worker in use
-    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
-    // so targetNumContainers = Ceil((2/1) * 1.2)) = 3.
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 
1.2, Min containers = 1, Max = 10
+    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3.
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(3, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+
 
-    Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            0.1, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 1, 10, 0.1, noopQueue, helixDataAccessor);
 
     runnable2.run();
 
     // 3 containers requested to max and one worker in use
-    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
-    // so targetNumContainers = Ceil((2/1) * 0.1)) = 1.
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 
1.2, Min containers = 1, Max = 10
+    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1.
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
 
-    Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            6.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
+            1, 1, 10, 6.0, noopQueue, helixDataAccessor);
 
     runnable3.run();
 
     // 3 containers requested to max and one worker in use
     // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
-    // so targetNumContainers = Ceil((2/1) * 6.0)) = 12.
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 12);
+    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10.
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   /**
@@ -458,43 +514,37 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
 
     TestYarnAutoScalingRunnable runnable =
-        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 
helixDataAccessor);
+        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 
1, helixDataAccessor);
 
     runnable.setRaiseException(true);
     runnable.run();
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(0)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Mockito.verify(mockYarnService, 
times(0)).requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
 
     Mockito.reset(mockYarnService);
     runnable.setRaiseException(false);
     runnable.run();
-    // 2 container requested
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+    // 1 container requested to max and one worker in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   public void testMaxValueEvictingQueue() throws Exception {
-    Resource resource = Resource.newInstance(16, 1);
     YarnAutoScalingManager.SlidingWindowReservoir window = new 
YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
+
     // Normal insertion with eviction of originally largest value
-    window.add(GobblinYarnTestUtils.createYarnContainerRequest(3, resource));
-    window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
-    window.add(GobblinYarnTestUtils.createYarnContainerRequest(2, resource));
+    window.add(3);
+    window.add(1);
+    window.add(2);
     // Now it contains [3,1,2]
-    Assert.assertEquals(window.getMax().getTotalContainers(), 3);
-    window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
+    Assert.assertEquals(window.getMax(), 3);
+    window.add(1);
     // Now it contains [1,2,1]
-    Assert.assertEquals(window.getMax().getTotalContainers(), 2);
-    window.add(GobblinYarnTestUtils.createYarnContainerRequest(5, resource));
-    Assert.assertEquals(window.getMax().getTotalContainers(), 5);
+    Assert.assertEquals(window.getMax(), 2);
+    window.add(5);
+    Assert.assertEquals(window.getMax(), 5);
     // Now it contains [2,1,5]
-    window.add(GobblinYarnTestUtils.createYarnContainerRequest(11, resource));
+    window.add(11);
     // Still [2,1,5] as 11 > 10 thereby being rejected.
-    Assert.assertEquals(window.getMax().getTotalContainers(), 5);
+    Assert.assertEquals(window.getMax(), 5);
   }
 
   /**
@@ -507,6 +557,7 @@ public class YarnAutoScalingManagerTest {
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
     WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
     JobDag mockJobDag = mock(JobDag.class);
+
     Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
     Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
 
@@ -534,17 +585,14 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     TestYarnAutoScalingRunnable runnable = new 
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, helixDataAccessor);
+            1, 1, 10, helixDataAccessor);
 
     runnable.run();
 
     // 2 containers requested and one worker in use, while the evaluation will 
hold for true if not set externally,
     // still tell YarnService there are two instances being used.
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+        requestTargetNumberOfContainers(2, 
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
 
     // Set failEvaluation which simulates the "beyond tolerance" case.
     Mockito.reset(mockYarnService);
@@ -552,98 +600,7 @@ public class YarnAutoScalingManagerTest {
     runnable.run();
 
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-2")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
-  }
-
-  @Test
-  public void testFlowsWithHelixTags() {
-    YarnService mockYarnService = mock(YarnService.class);
-    TaskDriver mockTaskDriver = mock(TaskDriver.class);
-
-    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
-    JobDag mockJobDag1 = mock(JobDag.class);
-
-    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
-    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
-    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
-    JobContext mockJobContext1 = mock(JobContext.class);
-    Mockito.when(mockJobContext1.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
-    JobContext mockJobContext2 = mock(JobContext.class);
-    Mockito.when(mockJobContext2.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
-    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
-    JobDag mockJobDag2 = mock(JobDag.class);
-
-    
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
-    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
-    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
-
-    JobContext mockJobContext3 = mock(JobContext.class);
-    Mockito.when(mockJobContext3.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-    
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
-    JobConfig mockJobConfig3 = mock(JobConfig.class);
-    String helixTag = "test-Tag1";
-    Map<String, String> resourceMap = new HashMap<>();
-    
resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, 
"512");
-    resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, 
"8");
-    Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag);
-    
Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap);
-    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
-    
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty(""),
-            "GobblinYarnTaskRunner-2", new HelixProperty(""),
-            "GobblinYarnTaskRunner-3", new HelixProperty("")));
-
-    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
-
-    runnable.run();
-
-    // 5 containers requested and 3 workers in use
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
-    Map<String, Set<String>> resourceHelixTagMap = 
argument.getValue().getResourceHelixTagMap();
-    Map<String, Resource> helixTagResourceMap = 
argument.getValue().getHelixTagResourceMap();
-    Map<String, Integer> helixTagContainerCountMap = 
argument.getValue().getHelixTagContainerCountMap();
-
-    // Verify that 3 containers requested with default tag and resource 
setting,
-    // while 2 with specific helix tag and resource requirement
-    Assert.assertEquals(resourceHelixTagMap.size(), 2);
-    Assert.assertEquals(helixTagResourceMap.get(helixTag), 
Resource.newInstance(512, 8));
-    Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag), 
Resource.newInstance(defaultContainerMemory, defaultContainerCores));
-    Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2);
-    Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag), 
3);
-
+        requestTargetNumberOfContainers(2, 
ImmutableSet.of("GobblinYarnTaskRunner-2"));
   }
 
   private static class TestYarnAutoScalingRunnable extends 
YarnAutoScalingManager.YarnAutoScalingRunnable {
@@ -651,9 +608,8 @@ public class YarnAutoScalingManagerTest {
     boolean alwaysUnused = false;
 
     public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService 
yarnService, int partitionsPerContainer,
-        HelixDataAccessor helixDataAccessor) {
-      super(taskDriver, yarnService, partitionsPerContainer, 1.0,
-          noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+        int minContainers, int maxContainers, HelixDataAccessor 
helixDataAccessor) {
+      super(taskDriver, yarnService, partitionsPerContainer, minContainers, 
maxContainers, 1.0, noopQueue, helixDataAccessor);
     }
 
     @Override
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index ac8edd16b..215e1bd03 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -208,43 +207,42 @@ public class YarnServiceTest {
    */
   @Test(groups = {"gobblin.yarn", "disabledOnCI"})
   public void testScaleUp() {
-    Resource resource = Resource.newInstance(64, 1);
-    this.yarnService.requestTargetNumberOfContainers(
-        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), 
Collections.EMPTY_SET);
+    this.yarnService.requestTargetNumberOfContainers(10, 
Collections.EMPTY_SET);
 
-    
Assert.assertFalse(this.yarnService.getMatchingRequestsList(resource).isEmpty());
+    Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 
1).isEmpty());
+    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
     Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
-    Assert.assertEquals(this.yarnService.getContainerMap().size(), 10);
+
     // container request list that had entries earlier should now be empty
-    
Assert.assertEquals(this.yarnService.getMatchingRequestsList(resource).size(), 
0);
+    Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 
1).size(), 0);
   }
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = 
"testScaleUp")
   public void testScaleDownWithInUseInstances() {
     Set<String> inUseInstances = new HashSet<>();
+
     for (int i = 1; i <= 8; i++) {
       inUseInstances.add("GobblinYarnTaskRunner_" + i);
     }
-    Resource resource = Resource.newInstance(64, 1);
-    this.yarnService.requestTargetNumberOfContainers(
-        GobblinYarnTestUtils.createYarnContainerRequest(6, resource), 
inUseInstances);
+
+    this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
+
+    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
 
     // will only be able to shrink to 8
     Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
 
     // will not be able to shrink to 6 due to 8 in-use instances
     Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
-    Assert.assertEquals(this.yarnService.getContainerMap().size(), 8);
+
   }
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = 
"testScaleDownWithInUseInstances")
   public void testScaleDown() throws Exception {
-    Resource resource = Resource.newInstance(64, 1);
-    this.yarnService.requestTargetNumberOfContainers(
-        GobblinYarnTestUtils.createYarnContainerRequest(4, resource), 
Collections.EMPTY_SET);
+    this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
 
+    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
     Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
-    Assert.assertEquals(this.yarnService.getContainerMap().size(), 4);
   }
 
   // Keep this test last since it interferes with the container counts in the 
prior tests.
@@ -281,25 +279,13 @@ public class YarnServiceTest {
         0), 0);
     Resource resource = Resource.newInstance(2048, 1);
     Container container = Container.newInstance(containerId, null, null, 
resource, null, null);
-    YarnService.ContainerInfo
-        containerInfo = new YarnService.ContainerInfo(container, 
"helixInstance1", "helixTag");
 
-    String command = yarnService.buildContainerCommand(containerInfo);
+    String command = yarnService.buildContainerCommand(container, 
"helixInstance1");
 
     // 1628 is from 2048 * 0.8 - 10
     Assert.assertTrue(command.contains("-Xmx1628"));
   }
 
-  /**
-   * Test if requested resource exceed the resource limit, yarnService should 
fail.
-   */
-  @Test(groups = {"gobblin.yarn", "disabledOnCI"}, expectedExceptions = 
IllegalArgumentException.class)
-  public void testExceedResourceLimit() {
-    Resource resource = Resource.newInstance(204800, 10240);
-    this.yarnService.requestTargetNumberOfContainers(
-        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), 
Collections.EMPTY_SET);
-  }
-
    static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
@@ -324,7 +310,7 @@ public class YarnServiceTest {
       return helixManager;
     }
 
-    protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
+    protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
         throws IOException {
       return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), 
Collections.emptyMap(),
               Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, 
Collections.emptyMap());
@@ -333,8 +319,10 @@ public class YarnServiceTest {
     /**
      * Get the list of matching container requests for the specified resource 
memory and cores.
      */
-    public List<? extends Collection<AMRMClient.ContainerRequest>> 
getMatchingRequestsList(Resource resource) {
+    public List<? extends Collection<AMRMClient.ContainerRequest>> 
getMatchingRequestsList(int memory, int cores) {
+      Resource resource = Resource.newInstance(memory, cores);
       Priority priority = Priority.newInstance(0);
+
       return getAmrmClientAsync().getMatchingRequests(priority, 
ResourceRequest.ANY, resource);
     }
 
@@ -358,12 +346,13 @@ public class YarnServiceTest {
           Thread.currentThread().interrupt();
           break;
         }
-        ConcurrentMap<ContainerId, ContainerInfo> containerMap = 
getContainerMap();
+
         if (expectedCount == getContainerMap().size()) {
           success = true;
           break;
         }
       }
+
       return success;
     }
   }
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index f2f2a1cfb..1934ecebb 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -33,6 +33,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -197,11 +198,10 @@ public class YarnServiceTestWithExpiration {
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"})
   public void testStartError() throws Exception{
-    Resource resource = Resource.newInstance(16, 1);
-    this.expiredYarnService.requestTargetNumberOfContainers(
-        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), 
Collections.EMPTY_SET);
+    this.expiredYarnService.requestTargetNumberOfContainers(10, 
Collections.EMPTY_SET);
 
-    
Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(resource).isEmpty());
+    Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64, 
1).isEmpty());
+    Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(), 
10);
 
     
AssertWithBackoff.create().logger(LOG).timeoutMs(60000).maxSleepMs(2000).backoffFactor(1.5)
         .assertTrue(new Predicate<Void>() {
@@ -234,7 +234,7 @@ public class YarnServiceTestWithExpiration {
       completedContainers.add(containerStatus);
     }
 
-    protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
+    protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
         throws IOException {
       try {
         Thread.sleep(1000);
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf 
b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
index 73ecf85cd..d2dc49c46 100644
--- a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -17,7 +17,6 @@
 
 # Yarn/Helix configuration properties
 gobblin.cluster.helix.cluster.name=YarnServiceTest
-gobblin.cluster.helixInstanceTags=GobblinKafkaStreaming
 gobblin.yarn.app.name=YarnServiceTest
 gobblin.yarn.work.dir=YarnServiceTest
 
@@ -31,7 +30,7 @@ gobblin.yarn.app.master.cores=1
 gobblin.yarn.app.report.interval.minutes=1
 gobblin.yarn.max.get.app.report.failures=4
 gobblin.yarn.email.notification.on.shutdown=false
-gobblin.yarn.initial.containers=0
+gobblin.yarn.initial.containers=1
 gobblin.yarn.container.memory.mbs=64
 gobblin.yarn.container.cores=1
 gobblin.yarn.container.affinity.enabled=true

Reply via email to