This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2463eda69 Make Yarn container and helix instance allocation group by 
tag (#3519)
2463eda69 is described below

commit 2463eda692b97d6947b21154abd37997a9ac3a73
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Thu Jun 9 12:27:42 2022 -0700

    Make Yarn container and helix instance allocation group by tag (#3519)
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  10 +-
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  16 +-
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |  27 +-
 .../gobblin/yarn/YarnAutoScalingManager.java       | 122 ++++----
 .../gobblin/yarn/YarnContainerRequestBundle.java   |  76 +++++
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  27 +-
 .../java/org/apache/gobblin/yarn/YarnService.java  | 208 ++++++++-----
 .../gobblin/yarn/event/NewContainerRequest.java    |  12 +-
 .../apache/gobblin/yarn/GobblinYarnTestUtils.java  |   9 +-
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 340 ++++++++++++---------
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |  59 ++--
 .../yarn/YarnServiceTestWithExpiration.java        |  12 +-
 .../src/test/resources/YarnServiceTest.conf        |   3 +-
 13 files changed, 588 insertions(+), 333 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 8c9513f50..4ae21c1d3 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -84,6 +84,7 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobTag";
   public static final String HELIX_PLANNING_JOB_TAG_KEY = 
GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
   public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX 
+ "helixInstanceTags";
+  public static final String HELIX_DEFAULT_TAG = "GobblinHelixDefaultTag";
 
   // Helix job quota
   public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobType";
@@ -184,6 +185,13 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
   public static final boolean 
DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;
 
+  // Config to specify the resource requirement for each Gobblin job run, so 
that helix tasks within this job will
+  // be assigned to containers with desired resource. This config need to 
cooperate with helix job tag, so that helix
+  // cluster knows how to distribute tasks to correct containers.
+  public static final String HELIX_JOB_CONTAINER_MEMORY_MBS = 
GOBBLIN_CLUSTER_PREFIX + "job.container.memory.mbs";
+  public static final String HELIX_JOB_CONTAINER_CORES = 
GOBBLIN_CLUSTER_PREFIX + "job.container.cores";
+
+
 
   //Config to enable/disable reuse of existing Helix Cluster
   public static final String HELIX_CLUSTER_OVERWRITE_KEY = 
GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
@@ -205,4 +213,4 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + 
"containerId";
 
   public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX = 
GOBBLIN_CLUSTER_PREFIX + "sysProps";
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index df15ee385..933ec64ce 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -413,6 +413,20 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
         
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
             
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
 
+    Map<String, String> jobConfigMap = new HashMap<>();
+    if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS))
 {
+      
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
+          
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
+      log.info("Job {} has specific memory requirement:{}, add this config to 
command config map",
+          this.jobContext.getJobId(), 
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
+    }
+    if 
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES))
 {
+      
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
+          
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
+      log.info("Job {} has specific Vcore requirement:{}, add this config to 
command config map",
+          this.jobContext.getJobId(), 
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
+    }
+    jobConfigBuilder.setJobCommandConfigMap(jobConfigMap);
     return jobConfigBuilder;
   }
 
@@ -572,4 +586,4 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       this.fs.delete(jobStateFilePath, false);
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index cad583908..610682aa8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -24,9 +24,11 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -540,16 +542,25 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
    * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance 
with EXAMPLE_INSTANCE_TAG was found.
    */
   private void addInstanceTags() {
-    List<String> tags = ConfigUtils.getStringList(this.clusterConfig, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
     HelixManager receiverManager = getReceiverManager();
     if (receiverManager.isConnected()) {
-      if (!tags.isEmpty()) {
-        logger.info("Adding tags binding " + tags);
-        tags.forEach(tag -> receiverManager.getClusterManagmentTool()
-            .addInstanceTag(this.clusterName, this.helixInstanceName, tag));
-        logger.info("Actual tags binding " + 
receiverManager.getClusterManagmentTool()
-            .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags());
+      // The helix instance associated with this container should be 
consistent on helix tag
+      List<String> existedTags = receiverManager.getClusterManagmentTool()
+          .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags();
+      Set<String> desiredTags = new HashSet<>(
+          ConfigUtils.getStringList(this.clusterConfig, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
+      if (!desiredTags.isEmpty()) {
+        // Remove tag assignments for the current Helix instance from a 
previous run
+        for (String tag : existedTags) {
+          if (!desiredTags.contains(tag))
+            
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName, 
this.helixInstanceName, tag);
+          logger.info("Removed unrelated helix tag {} for instance {}", tag, 
this.helixInstanceName);
+        }
+        desiredTags.forEach(desiredTag -> 
receiverManager.getClusterManagmentTool()
+            .addInstanceTag(this.clusterName, this.helixInstanceName, 
desiredTag));
       }
+      logger.info("Actual tags binding " + 
receiverManager.getClusterManagmentTool()
+          .getInstanceConfig(this.clusterName, 
this.helixInstanceName).getTags());
     }
   }
 
@@ -819,4 +830,4 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
       System.exit(1);
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 458527abc..e7ab082e2 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,11 +17,13 @@
 
 package org.apache.gobblin.yarn;
 
+import com.google.common.base.Strings;
 import java.util.ArrayDeque;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -29,9 +31,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
@@ -67,14 +72,13 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   // Only one container will be requested for each N partitions of work
   private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = 
AUTO_SCALING_PREFIX + "partitionsPerContainer";
   private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
-  private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + 
"minContainers";
-  private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
-  private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + 
"maxContainers";
   private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 
AUTO_SCALING_PREFIX + "overProvisionFactor";
   private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 
1.0;
+  // The cluster level default tags for Helix instances
+  private final String defaultHelixInstanceTags;
+  private final int defaultContainerMemoryMbs;
+  private final int defaultContainerCores;
 
-  // A rough value of how much containers should be an intolerable number.
-  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + 
"initialDelay";
   private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
 
@@ -87,8 +91,6 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final ScheduledExecutorService autoScalingExecutor;
   private final YarnService yarnService;
   private final int partitionsPerContainer;
-  private final int minContainers;
-  private final int maxContainers;
   private final double overProvisionFactor;
   private final SlidingWindowReservoir slidingFixedSizeWindow;
   private static int maxIdleTimeInMinutesBeforeScalingDown = 
DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
@@ -103,31 +105,20 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     Preconditions.checkArgument(this.partitionsPerContainer > 0,
         AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
 
-    this.minContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MIN_CONTAINERS,
-        DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
-
-    Preconditions.checkArgument(this.minContainers > 0,
-        DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");
-
-    this.maxContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MAX_CONTAINERS,
-        DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
-
     this.overProvisionFactor = ConfigUtils.getDouble(this.config, 
AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
         DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
 
-    Preconditions.checkArgument(this.maxContainers > 0,
-        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
-
-    Preconditions.checkArgument(this.maxContainers >= this.minContainers,
-        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or 
equal to "
-            + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
-
     this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
-        ? new SlidingWindowReservoir(maxContainers, 
config.getInt(AUTO_SCALING_WINDOW_SIZE))
-        : new SlidingWindowReservoir(maxContainers);
+        ? new SlidingWindowReservoir(config.getInt(AUTO_SCALING_WINDOW_SIZE), 
Integer.MAX_VALUE)
+        : new SlidingWindowReservoir(Integer.MAX_VALUE);
 
     this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("AutoScalingExecutor")));
+
+    this.defaultHelixInstanceTags = ConfigUtils.getString(config,
+        GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+    this.defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+    this.defaultContainerCores = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
   }
 
   @Override
@@ -140,9 +131,10 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     log.info("Scheduling the auto scaling task with an interval of {} 
seconds", scheduleInterval);
 
     this.autoScalingExecutor.scheduleAtFixedRate(new 
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
-            this.yarnService, this.partitionsPerContainer, this.minContainers, 
this.maxContainers, this.overProvisionFactor,
-            this.slidingFixedSizeWindow, 
this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
-        TimeUnit.SECONDS);
+            this.yarnService, this.partitionsPerContainer, 
this.overProvisionFactor,
+            this.slidingFixedSizeWindow, 
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
+            this.defaultContainerMemoryMbs, this.defaultContainerCores),
+        initialDelay, scheduleInterval, TimeUnit.SECONDS);
   }
 
   @Override
@@ -162,11 +154,13 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     private final TaskDriver taskDriver;
     private final YarnService yarnService;
     private final int partitionsPerContainer;
-    private final int minContainers;
-    private final int maxContainers;
     private final double overProvisionFactor;
     private final SlidingWindowReservoir slidingWindowReservoir;
     private final HelixDataAccessor helixDataAccessor;
+    private final String defaultHelixInstanceTags;
+    private final int defaultContainerMemoryMbs;
+    private final int defaultContainerCores;
+
     /**
      * A static map that keep track of an idle instance and its latest 
beginning idle time.
      * If an instance is no longer idle when inspected, it will be dropped 
from this map.
@@ -202,8 +196,7 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     @VisibleForTesting
     void runInternal() {
       Set<String> inUseInstances = new HashSet<>();
-
-      int numPartitions = 0;
+      YarnContainerRequestBundle yarnContainerRequestBundle = new 
YarnContainerRequestBundle();
       for (Map.Entry<String, WorkflowConfig> workFlowEntry : 
taskDriver.getWorkflows().entrySet()) {
         WorkflowContext workflowContext = 
taskDriver.getWorkflowContext(workFlowEntry.getKey());
 
@@ -217,24 +210,42 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
 
         WorkflowConfig workflowConfig = workFlowEntry.getValue();
         JobDag jobDag = workflowConfig.getJobDag();
-
         Set<String> jobs = jobDag.getAllNodes();
 
         // sum up the number of partitions
         for (String jobName : jobs) {
           JobContext jobContext = taskDriver.getJobContext(jobName);
-
+          JobConfig jobConfig = taskDriver.getJobConfig(jobName);
+          Resource resource = 
Resource.newInstance(this.defaultContainerMemoryMbs, 
this.defaultContainerCores);
+          int numPartitions = 0;
+          String jobTag = defaultHelixInstanceTags;
           if (jobContext != null) {
             log.debug("JobContext {} num partitions {}", jobContext, 
jobContext.getPartitionSet().size());
 
             
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
-                .filter(e -> e != null).collect(Collectors.toSet()));
-
-            numPartitions += jobContext.getPartitionSet().size();
+                .filter(Objects::nonNull).collect(Collectors.toSet()));
+
+            numPartitions = jobContext.getPartitionSet().size();
+            // Job level config for helix instance tags takes precedence over 
other tag configurations
+            if (jobConfig != null) {
+              if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
+                jobTag = jobConfig.getInstanceGroupTag();
+              }
+              Map<String, String> jobCommandConfigMap = 
jobConfig.getJobCommandConfigMap();
+              
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
+                
resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
+              }
+              
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
+                
resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
+              }
+            }
           }
+          // compute the container count as a ceiling of number of partitions 
divided by the number of containers
+          // per partition. Scale the result by a constant overprovision 
factor.
+          int containerCount = (int) Math.ceil(((double)numPartitions / 
this.partitionsPerContainer) * this.overProvisionFactor);
+          yarnContainerRequestBundle.add(jobTag, containerCount, resource);
         }
       }
-
       // Find all participants appearing in this cluster. Note that Helix 
instances can contain cluster-manager
       // and potentially replanner-instance.
       Set<String> allParticipants = 
getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
@@ -253,17 +264,11 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
           instanceIdleSince.remove(participant);
         }
       }
+      slidingWindowReservoir.add(yarnContainerRequestBundle);
 
-      // compute the target containers as a ceiling of number of partitions 
divided by the number of containers
-      // per partition. Scale the result by a constant overprovision factor.
-      int numTargetContainers = (int) Math.ceil(((double)numPartitions / 
this.partitionsPerContainer) * this.overProvisionFactor);
-
-      // adjust the number of target containers based on the configured min 
and max container values.
-      numTargetContainers = Math.max(this.minContainers, 
Math.min(this.maxContainers, numTargetContainers));
-
-      slidingWindowReservoir.add(numTargetContainers);
-
-      log.info("There are {} containers being requested", numTargetContainers);
+      log.debug("There are {} containers being requested in total, tag-count 
map {}, tag-resource map {}",
+          yarnContainerRequestBundle.getTotalContainers(), 
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
+          yarnContainerRequestBundle.getHelixTagResourceMap());
 
       
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
 inUseInstances);
     }
@@ -290,8 +295,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
    * which captures max value. It is NOT built for general purpose.
    */
   static class SlidingWindowReservoir {
-    private ArrayDeque<Integer> fifoQueue;
-    private PriorityQueue<Integer> priorityQueue;
+    private ArrayDeque<YarnContainerRequestBundle> fifoQueue;
+    private PriorityQueue<YarnContainerRequestBundle> priorityQueue;
 
     // Queue Size
     private int maxSize;
@@ -306,10 +311,11 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       this.maxSize = maxSize;
       this.upperBound = upperBound;
       this.fifoQueue = new ArrayDeque<>(maxSize);
-      this.priorityQueue = new PriorityQueue<>(maxSize, new 
Comparator<Integer>() {
+      this.priorityQueue = new PriorityQueue<>(maxSize, new 
Comparator<YarnContainerRequestBundle>() {
         @Override
-        public int compare(Integer o1, Integer o2) {
-          return o2.compareTo(o1);
+        public int compare(YarnContainerRequestBundle o1, 
YarnContainerRequestBundle o2) {
+          Integer i2 = o2.getTotalContainers();
+          return i2.compareTo(o1.getTotalContainers());
         }
       });
     }
@@ -323,14 +329,14 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
      * When a new element is larger than upperbound, reject the value since we 
may request too many Yarn containers.
      * When queue is full, evict head of FIFO-queue (In FIFO queue, elements 
are inserted from tail).
      */
-    public void add(int e) {
-      if (e > upperBound) {
+    public void add(YarnContainerRequestBundle e) {
+      if (e.getTotalContainers() > upperBound) {
         log.error(String.format("Request of getting %s containers seems to be 
excessive, rejected", e));
         return;
       }
 
       if (fifoQueue.size() == maxSize) {
-        Integer removedElement = fifoQueue.remove();
+        YarnContainerRequestBundle removedElement = fifoQueue.remove();
         priorityQueue.remove(removedElement);
       }
 
@@ -345,7 +351,7 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     /**
      * If queue is empty, throw {@link IllegalStateException}.
      */
-    public int getMax() {
+    public YarnContainerRequestBundle getMax() {
       if (priorityQueue.size() > 0) {
         return this.priorityQueue.peek();
       } else {
@@ -353,4 +359,4 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       }
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
new file mode 100644
index 000000000..4353d588e
--- /dev/null
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The class that represents current Yarn container request that will be used 
by {link @YarnService}.
+ * Yarn container allocation should associate with helix tag, as workflows can 
have specific helix tag setup
+ * and specific resource requirement.
+ */
+@Slf4j
+@Getter
+public class YarnContainerRequestBundle {
+  int totalContainers;
+  private final Map<String, Integer> helixTagContainerCountMap;
+  private final Map<String, Resource> helixTagResourceMap;
+  private final Map<String, Set<String>> resourceHelixTagMap;
+
+  public YarnContainerRequestBundle() {
+    this.totalContainers = 0;
+    this.helixTagContainerCountMap = new HashMap<>();
+    this.helixTagResourceMap = new HashMap<>();
+    this.resourceHelixTagMap = new HashMap<>();
+  }
+
+  public void add(String helixTag, int containerCount, Resource resource) {
+    helixTagContainerCountMap.put(helixTag, 
helixTagContainerCountMap.getOrDefault(helixTag, 0) + containerCount);
+    if(helixTagResourceMap.containsKey(helixTag)) {
+      Resource existedResource = helixTagResourceMap.get(helixTag);
+      Preconditions.checkArgument(resource.getMemory() == 
existedResource.getMemory() &&
+              resource.getVirtualCores() == existedResource.getVirtualCores(),
+          "Helix tag need to have consistent resource requirement. Tag " + 
helixTag
+              + " has existed resource require " + existedResource.toString() 
+ " and different require " + resource.toString());
+    } else {
+      helixTagResourceMap.put(helixTag, resource);
+      Set<String> tagSet = 
resourceHelixTagMap.getOrDefault(resource.toString(), new HashSet<>());
+      tagSet.add(helixTag);
+      resourceHelixTagMap.put(resource.toString(), tagSet);
+    }
+    totalContainers += containerCount;
+  }
+
+  // This method assumes the resource requirement for the helix tag is already 
stored in the map
+  public void add(String helixTag, int containerCount) {
+    if (!helixTagContainerCountMap.containsKey(helixTag) && 
!helixTagResourceMap.containsKey(helixTag)) {
+      log.error("Helix tag {} is not present in the request bundle yet, can't 
process the request to add {} "
+          + "container for it without specifying the resource requirement", 
helixTag, containerCount);
+      return;
+    }
+    helixTagContainerCountMap.put(helixTag, 
helixTagContainerCountMap.get(helixTag) + containerCount);
+    this.totalContainers += containerCount;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 5309a13ac..1ecbd4510 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -229,4 +230,28 @@ public class YarnHelixUtils {
   public static String getContainerNum(String containerId) {
     return "container-" + containerId.substring(containerId.lastIndexOf("_") + 
1);
   }
-}
+
+  /**
+   * Find the helix tag for the newly allocated container. The tag should 
align with {@link YarnContainerRequestBundle},
+   * so that the correct resource can be allocated to helix workflow that has 
specific resource requirement.
+   * @param container newly allocated container
+   * @param helixTagAllocatedContainerCount current container count for each 
helix tag
+   * @param requestedYarnContainer yarn container request specify the desired 
state
+   * @return helix tag that this container should be assigned with, if null 
means need to use the default
+   */
+  public static String findHelixTagForContainer(Container container,
+      Map<String, Integer> helixTagAllocatedContainerCount, 
YarnContainerRequestBundle requestedYarnContainer) {
+    String foundTag = null;
+    if(requestedYarnContainer != null && 
requestedYarnContainer.getResourceHelixTagMap().containsKey(container.getResource().toString()))
 {
+      for (String tag : 
requestedYarnContainer.getResourceHelixTagMap().get(container.getResource().toString()))
 {
+        int desiredCount = 
requestedYarnContainer.getHelixTagContainerCountMap().get(tag);
+        int allocatedCount = helixTagAllocatedContainerCount.getOrDefault(tag, 
0);
+        foundTag = tag;
+        if(allocatedCount < desiredCount) {
+          return foundTag;
+        }
+      }
+    }
+    return foundTag;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 9fadc9485..d265d0296 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.yarn;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -35,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import lombok.AllArgsConstructor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -70,7 +70,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -127,6 +126,7 @@ public class YarnService extends AbstractIdleService {
   private final String applicationName;
   private final String applicationId;
   private final String appViewAcl;
+  //Default helix instance tag derived from cluster level config
   private final String helixInstanceTags;
 
   private final Config config;
@@ -158,6 +158,7 @@ public class YarnService extends AbstractIdleService {
   private final String containerTimezone;
   private final HelixManager helixManager;
 
+  @Getter(AccessLevel.PROTECTED)
   private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
 
   // Security tokens for accessing HDFS
@@ -167,10 +168,10 @@ public class YarnService extends AbstractIdleService {
 
   private final Object allContainersStopped = new Object();
 
-  // A map from container IDs to pairs of Container instances and Helix 
participant IDs of the containers
+  // A map from container IDs to Container instances, Helix participant IDs of 
the containers and Helix Tag
   @VisibleForTesting
   @Getter(AccessLevel.PROTECTED)
-  private final ConcurrentMap<ContainerId, Map.Entry<Container, String>> 
containerMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<ContainerId, ContainerInfo> containerMap = 
Maps.newConcurrentMap();
 
   // A cache of the containers with an outstanding container release request.
   // This is a cache instead of a set to get the automatic cleanup in case a 
container completes before the requested
@@ -190,16 +191,16 @@ public class YarnService extends AbstractIdleService {
   // instance names get picked up when replacement containers get allocated.
   private final Set<String> unusedHelixInstanceNames = 
ConcurrentHashMap.newKeySet();
 
-  private volatile boolean shutdownInProgress = false;
+  // The map from helix tag to requested container count
+  private final Map<String, Integer> requestedContainerCountMap = 
Maps.newConcurrentMap();
+  // The map from helix tag to allocated container count
+  private final Map<String, Integer> allocatedContainerCountMap = 
Maps.newConcurrentMap();
 
-  // The number of containers requested based on the desired target number of 
containers. This is used to determine
-  // how may additional containers to request since the the currently 
allocated amount may be less than this amount if we
-  // are waiting for containers to be allocated.
-  // The currently allocated amount may also be higher than this amount if 
YARN returned more than the requested number
-  // of containers.
-  @VisibleForTesting
-  @Getter(AccessLevel.PROTECTED)
-  private int numRequestedContainers = 0;
+  private volatile YarnContainerRequestBundle yarnContainerRequest;
+  private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
+  private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
+
+  private volatile boolean shutdownInProgress = false;
 
   public YarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
       FileSystem fs, EventBus eventBus, HelixManager helixManager) throws 
Exception {
@@ -236,7 +237,8 @@ public class YarnService extends AbstractIdleService {
     this.containerHostAffinityEnabled = 
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
 
     this.helixInstanceMaxRetries = 
config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
-    this.helixInstanceTags = ConfigUtils.getString(config, 
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
+    this.helixInstanceTags = ConfigUtils.getString(config,
+        GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
 
     this.containerJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
         
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
 :
@@ -286,14 +288,8 @@ public class YarnService extends AbstractIdleService {
           this.requestedContainerCores));
       return;
     }
-
-    requestContainer(newContainerRequest.getReplacedContainer().transform(new 
Function<Container, String>() {
-
-      @Override
-      public String apply(Container container) {
-        return container.getNodeId().getHost();
-      }
-    }));
+    
requestContainer(newContainerRequest.getReplacedContainer().transform(container 
-> container.getNodeId().getHost()),
+        newContainerRequest.getResource());
   }
 
   protected NMClientCallbackHandler getNMClientCallbackHandler() {
@@ -359,10 +355,10 @@ public class YarnService extends AbstractIdleService {
       ExecutorsUtils.shutdownExecutorService(this.containerLaunchExecutor, 
Optional.of(LOGGER));
 
       // Stop the running containers
-      for (Map.Entry<Container, String> entry : this.containerMap.values()) {
-        LOGGER.info(String.format("Stopping container %s running participant 
%s", entry.getKey().getId(),
-            entry.getValue()));
-        this.nmClientAsync.stopContainerAsync(entry.getKey().getId(), 
entry.getKey().getNodeId());
+      for (ContainerInfo containerInfo : this.containerMap.values()) {
+        LOGGER.info("Stopping container {} running participant {}", 
containerInfo.getContainer().getId(),
+            containerInfo.getHelixParticipantId());
+        
this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), 
containerInfo.getContainer().getNodeId());
       }
 
       if (!this.containerMap.isEmpty()) {
@@ -431,27 +427,35 @@ public class YarnService extends AbstractIdleService {
    * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
    * adjust the cluster towards the desired number of containers.
    *
-   * @param numTargetContainers the desired number of containers
+   * @param yarnContainerRequestBundle the desired containers information, 
including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
    */
-  public synchronized void requestTargetNumberOfContainers(int 
numTargetContainers, Set<String> inUseInstances) {
-    LOGGER.debug("Requesting numTargetContainers {} current 
numRequestedContainers {} in use instances {} map size {}",
-        numTargetContainers, this.numRequestedContainers, inUseInstances, 
this.containerMap.size());
-
+  public synchronized void 
requestTargetNumberOfContainers(YarnContainerRequestBundle 
yarnContainerRequestBundle, Set<String> inUseInstances) {
+    LOGGER.debug("Requesting numTargetContainers {}, in use instances count is 
{}, container map size is {}",
+        yarnContainerRequestBundle.getTotalContainers(), inUseInstances, 
this.containerMap.size());
+    int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
     // YARN can allocate more than the requested number of containers, compute 
additional allocations and deallocations
     // based on the max of the requested and actual allocated counts
     int numAllocatedContainers = this.containerMap.size();
 
-    // The number of allocated containers may be higher than the previously 
requested amount
-    // and there may be outstanding allocation requests, so the max of both 
counts is computed here
-    // and used to decide whether to allocate containers.
-    int numContainers = Math.max(numRequestedContainers, 
numAllocatedContainers);
-
     // Request additional containers if the desired count is higher than the 
max of the current allocation or previously
     // requested amount. Note that there may be in-flight or additional 
allocations after numContainers has been computed
     // so overshooting can occur, but periodic calls to this method will make 
adjustments towards the target.
-    for (int i = numContainers; i < numTargetContainers; i++) {
-      requestContainer(Optional.<String>absent());
+    for (Map.Entry<String, Integer> entry : 
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
+      String currentHelixTag = entry.getKey();
+      int desiredContainerCount = entry.getValue();
+      int requestedContainerCount = 
requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
+      for(; requestedContainerCount < desiredContainerCount; 
requestedContainerCount++) {
+        requestContainer(Optional.absent(), 
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
+      }
+      requestedContainerCountMap.put(currentHelixTag, desiredContainerCount);
+    }
+
+    // If a requested tag is not presented in the new request, update the 
requested count to 0 as we should release them
+    for(String requestedHelixTag : requestedContainerCountMap.keySet()) {
+      
if(!yarnContainerRequestBundle.getHelixTagContainerCountMap().containsKey(requestedHelixTag))
 {
+        requestedContainerCountMap.put(requestedHelixTag, 0);
+      }
     }
 
     // If the total desired is lower than the currently allocated amount then 
release free containers.
@@ -462,12 +466,13 @@ public class YarnService extends AbstractIdleService {
       LOGGER.debug("Shrinking number of containers by {}", 
(numAllocatedContainers - numTargetContainers));
 
       List<Container> containersToRelease = new ArrayList<>();
-      int numToShutdown = numContainers - numTargetContainers;
+      int numToShutdown = numAllocatedContainers - numTargetContainers;
 
       // Look for eligible containers to release. If a container is in use 
then it is not released.
-      for (Map.Entry<ContainerId, Map.Entry<Container, String>> entry : 
this.containerMap.entrySet()) {
-        if (!inUseInstances.contains(entry.getValue().getValue())) {
-          containersToRelease.add(entry.getValue().getKey());
+      for (Map.Entry<ContainerId, ContainerInfo> entry : 
this.containerMap.entrySet()) {
+        ContainerInfo containerInfo = entry.getValue();
+        if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
+          containersToRelease.add(containerInfo.getContainer());
         }
 
         if (containersToRelease.size() == numToShutdown) {
@@ -479,32 +484,47 @@ public class YarnService extends AbstractIdleService {
 
       this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
     }
-
-    this.numRequestedContainers = numTargetContainers;
+    this.yarnContainerRequest = yarnContainerRequestBundle;
+    LOGGER.info("Current tag-container being requested:{}, tag-container 
allocated: {}",
+        this.requestedContainerCountMap, this.allocatedContainerCountMap);
   }
 
+  // Request initial containers with default resource and helix tag
   private void requestInitialContainers(int containersRequested) {
-    requestTargetNumberOfContainers(containersRequested, 
Collections.EMPTY_SET);
+    YarnContainerRequestBundle initialYarnContainerRequest = new 
YarnContainerRequestBundle();
+    Resource capability = 
Resource.newInstance(this.requestedContainerMemoryMbs, 
this.requestedContainerCores);
+    initialYarnContainerRequest.add(this.helixInstanceTags, 
containersRequested, capability);
+    requestTargetNumberOfContainers(initialYarnContainerRequest, 
Collections.EMPTY_SET);
   }
 
-  private void requestContainer(Optional<String> preferredNode) {
-    Priority priority = Records.newRecord(Priority.class);
-    priority.setPriority(0);
+  private void requestContainer(Optional<String> preferredNode, 
Optional<Resource> resourceOptional) {
+    Resource desiredResource = resourceOptional.or(Resource.newInstance(
+        this.requestedContainerMemoryMbs, this.requestedContainerCores));
+    requestContainer(preferredNode, desiredResource);
+  }
 
-    Resource capability = Records.newRecord(Resource.class);
-    int maxMemoryCapacity = this.maxResourceCapacity.get().getMemory();
-    capability.setMemory(this.requestedContainerMemoryMbs <= maxMemoryCapacity 
?
-        this.requestedContainerMemoryMbs : maxMemoryCapacity);
-    int maxCoreCapacity = this.maxResourceCapacity.get().getVirtualCores();
-    capability.setVirtualCores(this.requestedContainerCores <= maxCoreCapacity 
?
-        this.requestedContainerCores : maxCoreCapacity);
+  // Request containers with specific resource requirement
+  private void requestContainer(Optional<String> preferredNode, Resource 
resource) {
+    // Fail if Yarn cannot meet container resource requirements
+    Preconditions.checkArgument(resource.getMemory() <= 
this.maxResourceCapacity.get().getMemory() &&
+            resource.getVirtualCores() <= 
this.maxResourceCapacity.get().getVirtualCores(),
+        "Resource requirement must less than the max resource capacity. 
Requested resource" + resource.toString()
+            + " exceed the max resource limit " + 
this.maxResourceCapacity.get().toString());
+
+    // Due to YARN-314, different resource capacity needs different priority, 
otherwise Yarn will not allocate container
+    Priority priority = Records.newRecord(Priority.class);
+    if(!resourcePriorityMap.containsKey(resource.toString())) {
+      resourcePriorityMap.put(resource.toString(), 
priorityNumGenerator.getAndIncrement());
+    }
+    int priorityNum = resourcePriorityMap.get(resource.toString());
+    priority.setPriority(priorityNum);
 
     String[] preferredNodes = preferredNode.isPresent() ? new String[] 
{preferredNode.get()} : null;
     this.amrmClientAsync.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, preferredNodes, null, 
priority));
+        new AMRMClient.ContainerRequest(resource, preferredNodes, null, 
priority));
   }
 
-  protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
+  protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
       throws IOException {
     Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
@@ -527,7 +547,7 @@ public class YarnService extends AbstractIdleService {
     ContainerLaunchContext containerLaunchContext = 
Records.newRecord(ContainerLaunchContext.class);
     containerLaunchContext.setLocalResources(resourceMap);
     
containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-    
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(container,
 helixInstanceName)));
+    
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
 
     Map<ApplicationAccessType, String> acls = new HashMap<>(1);
     acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -580,11 +600,11 @@ public class YarnService extends AbstractIdleService {
   }
 
   @VisibleForTesting
-  protected String buildContainerCommand(Container container, String 
helixInstanceName) {
+  protected String buildContainerCommand(ContainerInfo containerInfo) {
     String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
     StringBuilder containerCommand = new StringBuilder()
         
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
-        .append(" -Xmx").append((int) (container.getResource().getMemory() * 
this.jvmMemoryXmxRatio) -
+        .append(" -Xmx").append((int) 
(containerInfo.getContainer().getResource().getMemory() * 
this.jvmMemoryXmxRatio) -
             this.jvmMemoryOverheadMbs).append("M")
         .append(" 
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
@@ -596,16 +616,16 @@ public class YarnService extends AbstractIdleService {
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
         .append(" ").append(this.applicationId)
         .append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
-        .append(" ").append(helixInstanceName);
+        .append(" ").append(containerInfo.getHelixParticipantId());
 
-    if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
+    if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
       containerCommand.append(" 
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
-          .append(" ").append(helixInstanceTags);
+          .append(" ").append(containerInfo.getHelixTag());
     }
     return containerCommand.append(" 
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
-          containerProcessName).append(".").append(ApplicationConstants.STDOUT)
+        containerProcessName).append(".").append(ApplicationConstants.STDOUT)
         .append(" 
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
-          
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
+            
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
   }
 
   /**
@@ -640,11 +660,13 @@ public class YarnService extends AbstractIdleService {
    * A replacement container is needed in all but the last case.
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
-    Map.Entry<Container, String> completedContainerEntry = 
this.containerMap.remove(containerStatus.getContainerId());
+    ContainerInfo completedContainerInfo = 
this.containerMap.remove(containerStatus.getContainerId());
     //Get the Helix instance name for the completed container. Because 
callbacks are processed asynchronously, we might
     //encounter situations where handleContainerCompletion() is called before 
onContainersAllocated(), resulting in the
     //containerId missing from the containersMap.
-    String completedInstanceName = completedContainerEntry == null?  
UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
+    String completedInstanceName = completedContainerInfo == null?  
UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
+    String helixTag = completedContainerInfo == null ? helixInstanceTags : 
completedContainerInfo.getHelixTag();
+    allocatedContainerCountMap.put(helixTag, 
allocatedContainerCountMap.get(helixTag) - 1);
 
     LOGGER.info(String.format("Container %s running Helix instance %s has 
completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, 
containerStatus.getExitStatus()));
@@ -657,15 +679,15 @@ public class YarnService extends AbstractIdleService {
     if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
       if 
(this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != 
null) {
         LOGGER.info("Container release requested, so not spawning a 
replacement for containerId {}", containerStatus.getContainerId());
-        if (completedContainerEntry != null) {
+        if (completedContainerInfo != null) {
           LOGGER.info("Adding instance {} to the pool of unused instances", 
completedInstanceName);
           this.unusedHelixInstanceNames.add(completedInstanceName);
         }
         return;
       } else {
         LOGGER.info("Container {} aborted due to lost NM", 
containerStatus.getContainerId());
-       // Container release was not requested. Likely, the container was 
running on a node on which the NM died.
-       // In this case, RM assumes that the containers are "lost", even though 
the container process may still be
+        // Container release was not requested. Likely, the container was 
running on a node on which the NM died.
+        // In this case, RM assumes that the containers are "lost", even 
though the container process may still be
         // running on the node. We need to ensure that the Helix instances 
running on the orphaned containers
         // are fenced off from the Helix cluster to avoid double publishing 
and state being committed by the
         // instances.
@@ -683,7 +705,7 @@ public class YarnService extends AbstractIdleService {
     if (this.shutdownInProgress) {
       return;
     }
-    if(completedContainerEntry != null) {
+    if(completedContainerInfo != null) {
       this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new 
AtomicInteger(0));
       int retryCount = 
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
 
@@ -715,10 +737,13 @@ public class YarnService extends AbstractIdleService {
             
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, 
eventMetadataBuilder.get().build());
       }
     }
-    LOGGER.info(String.format("Requesting a new container to replace %s to run 
Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
+    Optional<Resource> newContainerResource = completedContainerInfo != null ?
+        Optional.of(completedContainerInfo.getContainer().getResource()) : 
Optional.absent();
+    LOGGER.info("Requesting a new container to replace {} to run Helix 
instance {} with helix tag {} and resource {}",
+        containerStatus.getContainerId(), completedInstanceName, helixTag, 
newContainerResource.orNull());
     this.eventBus.post(new NewContainerRequest(
-        shouldStickToTheSameNode(containerStatus.getExitStatus()) && 
completedContainerEntry != null ?
-            Optional.of(completedContainerEntry.getKey()) : 
Optional.<Container>absent()));
+        shouldStickToTheSameNode(containerStatus.getExitStatus()) && 
completedContainerInfo != null ?
+            Optional.of(completedContainerInfo.getContainer()) : 
Optional.absent(), newContainerResource));
   }
 
   private ImmutableMap.Builder<String, String> 
buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
@@ -755,12 +780,18 @@ public class YarnService extends AbstractIdleService {
     @Override
     public void onContainersAllocated(List<Container> containers) {
       for (final Container container : containers) {
+        String containerId = container.getId().toString();
+        String containerHelixTag = 
YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap, 
yarnContainerRequest);
+        if (Strings.isNullOrEmpty(containerHelixTag)) {
+          containerHelixTag = helixInstanceTags;
+        }
         if (eventSubmitter.isPresent()) {
           
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
-              GobblinYarnMetricTagNames.CONTAINER_ID, 
container.getId().toString());
+              GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
         }
 
-        LOGGER.info(String.format("Container %s has been allocated", 
container.getId()));
+        LOGGER.info("Container {} has been allocated with resource {} for 
helix tag {}",
+            container.getId(), container.getResource(), containerHelixTag);
 
         //Iterate over the (thread-safe) set of unused instances to find the 
first instance that is not currently live.
         //Once we find a candidate instance, it is removed from the set.
@@ -781,6 +812,8 @@ public class YarnService extends AbstractIdleService {
               instanceName = null;
             }
           }
+          allocatedContainerCountMap.put(containerHelixTag,
+              allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) + 
1);
         }
 
         if (Strings.isNullOrEmpty(instanceName)) {
@@ -789,8 +822,8 @@ public class YarnService extends AbstractIdleService {
               .getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, 
helixInstanceIdGenerator.incrementAndGet());
         }
 
-        final String finalInstanceName = instanceName;
-        containerMap.put(container.getId(), new 
AbstractMap.SimpleImmutableEntry<>(container, finalInstanceName));
+        ContainerInfo containerInfo = new ContainerInfo(container, 
instanceName, containerHelixTag);
+        containerMap.put(container.getId(), containerInfo);
 
         // Find matching requests and remove the request to reduce the chance 
that a subsequent request
         // will request extra containers. YARN does not have a delta request 
API and the requests are not
@@ -819,11 +852,11 @@ public class YarnService extends AbstractIdleService {
           @Override
           public void run() {
             try {
-              LOGGER.info("Starting container " + container.getId());
+              LOGGER.info("Starting container " + containerId);
 
-              nmClientAsync.startContainerAsync(container, 
newContainerLaunchContext(container, finalInstanceName));
+              nmClientAsync.startContainerAsync(container, 
newContainerLaunchContext(containerInfo));
             } catch (IOException ioe) {
-              LOGGER.error("Failed to start container " + container.getId(), 
ioe);
+              LOGGER.error("Failed to start container " + containerId, ioe);
             }
           }
         });
@@ -869,7 +902,7 @@ public class YarnService extends AbstractIdleService {
   /**
    * A custom implementation of {@link NMClientAsync.CallbackHandler}.
    */
-   class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
+  class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
 
     @Override
     public void onContainerStarted(ContainerId containerId, Map<String, 
ByteBuffer> allServiceResponse) {
@@ -939,4 +972,13 @@ public class YarnService extends AbstractIdleService {
       LOGGER.error(String.format("Failed to stop container %s due to error 
%s", containerId, t));
     }
   }
-}
+
+  //A class encapsulates Container instances, Helix participant IDs of the 
containers and Helix Tag
+  @AllArgsConstructor
+  @Getter
+  static class ContainerInfo {
+    private final Container container;
+    private final String helixParticipantId;
+    private final String helixTag;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
index 7ee4b3223..6e272b7c0 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
@@ -17,9 +17,11 @@
 
 package org.apache.gobblin.yarn.event;
 
+import lombok.Getter;
 import org.apache.hadoop.yarn.api.records.Container;
 
 import com.google.common.base.Optional;
+import org.apache.hadoop.yarn.api.records.Resource;
 
 
 /**
@@ -30,9 +32,17 @@ import com.google.common.base.Optional;
 public class NewContainerRequest {
 
   private final Optional<Container> replacedContainer;
+  @Getter
+  private final Optional<Resource> resource;
 
   public NewContainerRequest(Optional<Container> replacedContainer) {
     this.replacedContainer = replacedContainer;
+    this.resource = Optional.absent();
+  }
+
+  public NewContainerRequest(Optional<Container> replacedContainer, 
Optional<Resource> resource) {
+    this.replacedContainer = replacedContainer;
+    this.resource = resource;
   }
 
   /**
@@ -43,4 +53,4 @@ public class NewContainerRequest {
   public Optional<Container> getReplacedContainer() {
     return this.replacedContainer;
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
index 9bb5b747e..8dceb4420 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -75,4 +76,10 @@ public class GobblinYarnTestUtils {
     credentials.addToken(token.getService(), token);
     credentials.writeTokenStorageFile(path, new Configuration());
   }
-}
+
+  public static YarnContainerRequestBundle createYarnContainerRequest(int n, 
Resource resource) {
+    YarnContainerRequestBundle yarnContainerRequestBundle = new 
YarnContainerRequestBundle();
+    yarnContainerRequestBundle.add("GobblinKafkaStreaming", n, resource);
+    return yarnContainerRequestBundle;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 6c4047147..259378378 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -19,15 +19,22 @@ package org.apache.gobblin.yarn;
 
 import java.io.IOException;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -35,6 +42,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 
@@ -47,6 +55,9 @@ public class YarnAutoScalingManagerTest {
   // A queue within size == 1 and upperBound == "infinite" should not impact 
on the execution.
   private final static YarnAutoScalingManager.SlidingWindowReservoir noopQueue 
=
       new YarnAutoScalingManager.SlidingWindowReservoir(1, Integer.MAX_VALUE);
+  private final static int defaultContainerMemory = 1024;
+  private final static int defaultContainerCores = 2;
+  private final static String defaultHelixTag = "DefaultHelixTag";
   /**
    * Test for one workflow with one job
    */
@@ -82,13 +93,15 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
-            1, 10, 1.0, noopQueue, helixDataAccessor);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
-
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
     // 2 containers requested and one worker in use
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(2, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
   }
 
   /**
@@ -131,14 +144,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(3, 
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
   }
 
   /**
@@ -200,14 +216,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-3", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 5 containers requested and 3 workers in use
-    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(5,
-        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", 
"GobblinYarnTaskRunner-3"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
   }
 
   /**
@@ -269,14 +288,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
-    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(3,
-        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
   }
 
   /**
@@ -313,103 +335,17 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            2, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 2,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 1 container requested since 2 partitions and limit is 2 partitions per 
container. One worker in use.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
-  }
-
-
-  /**
-   * Test min containers
-   */
-  @Test
-  public void testMinContainers() throws IOException {
-    YarnService mockYarnService = mock(YarnService.class);
-    TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
-
-    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 5, 10, 1.0, noopQueue, helixDataAccessor);
-
-    runnable.run();
-
-    // 5 containers requested due to min and one worker in use
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(5, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
-  }
-
-  /**
-   * Test max containers
-   */
-  @Test
-  public void testMaxContainers() throws IOException {
-    YarnService mockYarnService = mock(YarnService.class);
-    TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
-
-    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 1, 1, 1.0, noopQueue, helixDataAccessor);
-
-    runnable.run();
-
-    // 1 container requested to max and one worker in use
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
   }
 
   @Test
@@ -443,41 +379,49 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 1, 10, 1.2, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
+            1.2, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable1.run();
 
     // 3 containers requested to max and one worker in use
-    // NumPartitions = 2, Partitions per container = 1 and overprovision = 
1.2, Min containers = 1, Max = 10
-    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(3, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
-
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
+    // so targetNumContainers = Ceil((2/1) * 1.2)) = 3.
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
 
+    Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 1, 10, 0.1, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
+            0.1, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable2.run();
 
     // 3 containers requested to max and one worker in use
-    // NumPartitions = 2, Partitions per container = 1 and overprovision = 
1.2, Min containers = 1, Max = 10
-    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
+    // so targetNumContainers = Ceil((2/1) * 0.1)) = 1.
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
 
+    Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService,
-            1, 1, 10, 6.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
+            6.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
 
     runnable3.run();
 
     // 3 containers requested to max and one worker in use
     // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
-    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    // so targetNumContainers = Ceil((2/1) * 6.0)) = 12.
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 12);
   }
 
   /**
@@ -514,37 +458,43 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
 
     TestYarnAutoScalingRunnable runnable =
-        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 
1, helixDataAccessor);
+        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 
helixDataAccessor);
 
     runnable.setRaiseException(true);
     runnable.run();
-    Mockito.verify(mockYarnService, 
times(0)).requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(0)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
 
     Mockito.reset(mockYarnService);
     runnable.setRaiseException(false);
     runnable.run();
-    // 1 container requested to max and one worker in use
-    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    // 2 container requested
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
   }
 
   public void testMaxValueEvictingQueue() throws Exception {
+    Resource resource = Resource.newInstance(16, 1);
     YarnAutoScalingManager.SlidingWindowReservoir window = new 
YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
-
     // Normal insertion with eviction of originally largest value
-    window.add(3);
-    window.add(1);
-    window.add(2);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(3, resource));
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(2, resource));
     // Now it contains [3,1,2]
-    Assert.assertEquals(window.getMax(), 3);
-    window.add(1);
+    Assert.assertEquals(window.getMax().getTotalContainers(), 3);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
     // Now it contains [1,2,1]
-    Assert.assertEquals(window.getMax(), 2);
-    window.add(5);
-    Assert.assertEquals(window.getMax(), 5);
+    Assert.assertEquals(window.getMax().getTotalContainers(), 2);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(5, resource));
+    Assert.assertEquals(window.getMax().getTotalContainers(), 5);
     // Now it contains [2,1,5]
-    window.add(11);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(11, resource));
     // Still [2,1,5] as 11 > 10 thereby being rejected.
-    Assert.assertEquals(window.getMax(), 5);
+    Assert.assertEquals(window.getMax().getTotalContainers(), 5);
   }
 
   /**
@@ -557,7 +507,6 @@ public class YarnAutoScalingManagerTest {
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
     WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
     JobDag mockJobDag = mock(JobDag.class);
-
     Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
     Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
 
@@ -585,14 +534,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     TestYarnAutoScalingRunnable runnable = new 
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, helixDataAccessor);
+        1, helixDataAccessor);
 
     runnable.run();
 
     // 2 containers requested and one worker in use, while the evaluation will 
hold for true if not set externally,
     // still tell YarnService there are two instances being used.
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(2, 
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
 
     // Set failEvaluation which simulates the "beyond tolerance" case.
     Mockito.reset(mockYarnService);
@@ -600,7 +552,98 @@ public class YarnAutoScalingManagerTest {
     runnable.run();
 
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(2, 
ImmutableSet.of("GobblinYarnTaskRunner-2"));
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+  }
+
+  @Test
+  public void testFlowsWithHelixTags() {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+    JobDag mockJobDag1 = mock(JobDag.class);
+
+    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
+    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+
+    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
+
+    JobContext mockJobContext1 = mock(JobContext.class);
+    Mockito.when(mockJobContext1.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+    JobContext mockJobContext2 = mock(JobContext.class);
+    Mockito.when(mockJobContext2.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
+    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
+    JobDag mockJobDag2 = mock(JobDag.class);
+
+    
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
+    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
+
+    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+
+    JobContext mockJobContext3 = mock(JobContext.class);
+    Mockito.when(mockJobContext3.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
+    
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
+    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+    JobConfig mockJobConfig3 = mock(JobConfig.class);
+    String helixTag = "test-Tag1";
+    Map<String, String> resourceMap = new HashMap<>();
+    
resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, 
"512");
+    resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, 
"8");
+    Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag);
+    
Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap);
+    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+    
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
+
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty(""),
+            "GobblinYarnTaskRunner-2", new HelixProperty(""),
+            "GobblinYarnTaskRunner-3", new HelixProperty("")));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
+
+    runnable.run();
+
+    // 5 containers requested and 3 workers in use
+    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+    Map<String, Set<String>> resourceHelixTagMap = 
argument.getValue().getResourceHelixTagMap();
+    Map<String, Resource> helixTagResourceMap = 
argument.getValue().getHelixTagResourceMap();
+    Map<String, Integer> helixTagContainerCountMap = 
argument.getValue().getHelixTagContainerCountMap();
+
+    // Verify that 3 containers requested with default tag and resource 
setting,
+    // while 2 with specific helix tag and resource requirement
+    Assert.assertEquals(resourceHelixTagMap.size(), 2);
+    Assert.assertEquals(helixTagResourceMap.get(helixTag), 
Resource.newInstance(512, 8));
+    Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag), 
Resource.newInstance(defaultContainerMemory, defaultContainerCores));
+    Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2);
+    Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag), 
3);
+
   }
 
   private static class TestYarnAutoScalingRunnable extends 
YarnAutoScalingManager.YarnAutoScalingRunnable {
@@ -608,8 +651,9 @@ public class YarnAutoScalingManagerTest {
     boolean alwaysUnused = false;
 
     public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService 
yarnService, int partitionsPerContainer,
-        int minContainers, int maxContainers, HelixDataAccessor 
helixDataAccessor) {
-      super(taskDriver, yarnService, partitionsPerContainer, minContainers, 
maxContainers, 1.0, noopQueue, helixDataAccessor);
+        HelixDataAccessor helixDataAccessor) {
+      super(taskDriver, yarnService, partitionsPerContainer, 1.0,
+          noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
     }
 
     @Override
@@ -634,4 +678,4 @@ public class YarnAutoScalingManagerTest {
       return alwaysUnused || super.isInstanceUnused(participant);
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 215e1bd03..76c53330b 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -147,7 +148,7 @@ public class YarnServiceTest {
         this.clusterConf,
         FileSystem.getLocal(new Configuration()), this.eventBus);
 
-   this.yarnService.startUp();
+    this.yarnService.startUp();
   }
 
   private void startApp() throws Exception {
@@ -207,42 +208,43 @@ public class YarnServiceTest {
    */
   @Test(groups = {"gobblin.yarn", "disabledOnCI"})
   public void testScaleUp() {
-    this.yarnService.requestTargetNumberOfContainers(10, 
Collections.EMPTY_SET);
+    Resource resource = Resource.newInstance(64, 1);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), 
Collections.EMPTY_SET);
 
-    Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 
1).isEmpty());
-    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
+    
Assert.assertFalse(this.yarnService.getMatchingRequestsList(resource).isEmpty());
     Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
-
+    Assert.assertEquals(this.yarnService.getContainerMap().size(), 10);
     // container request list that had entries earlier should now be empty
-    Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 
1).size(), 0);
+    
Assert.assertEquals(this.yarnService.getMatchingRequestsList(resource).size(), 
0);
   }
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = 
"testScaleUp")
   public void testScaleDownWithInUseInstances() {
     Set<String> inUseInstances = new HashSet<>();
-
     for (int i = 1; i <= 8; i++) {
       inUseInstances.add("GobblinYarnTaskRunner_" + i);
     }
-
-    this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
-
-    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
+    Resource resource = Resource.newInstance(64, 1);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(6, resource), 
inUseInstances);
 
     // will only be able to shrink to 8
     Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
 
     // will not be able to shrink to 6 due to 8 in-use instances
     Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
-
+    Assert.assertEquals(this.yarnService.getContainerMap().size(), 8);
   }
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = 
"testScaleDownWithInUseInstances")
   public void testScaleDown() throws Exception {
-    this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
+    Resource resource = Resource.newInstance(64, 1);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(4, resource), 
Collections.EMPTY_SET);
 
-    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
     Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
+    Assert.assertEquals(this.yarnService.getContainerMap().size(), 4);
   }
 
   // Keep this test last since it interferes with the container counts in the 
prior tests.
@@ -279,14 +281,26 @@ public class YarnServiceTest {
         0), 0);
     Resource resource = Resource.newInstance(2048, 1);
     Container container = Container.newInstance(containerId, null, null, 
resource, null, null);
+    YarnService.ContainerInfo
+        containerInfo = new YarnService.ContainerInfo(container, 
"helixInstance1", "helixTag");
 
-    String command = yarnService.buildContainerCommand(container, 
"helixInstance1");
+    String command = yarnService.buildContainerCommand(containerInfo);
 
     // 1628 is from 2048 * 0.8 - 10
     Assert.assertTrue(command.contains("-Xmx1628"));
   }
 
-   static class TestYarnService extends YarnService {
+  /**
+   * Test if requested resource exceed the resource limit, yarnService should 
fail.
+   */
+  @Test(groups = {"gobblin.yarn", "disabledOnCI"}, expectedExceptions = 
IllegalArgumentException.class)
+  public void testExceedResourceLimit() {
+    Resource resource = Resource.newInstance(204800, 10240);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), 
Collections.EMPTY_SET);
+  }
+
+  static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
       super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus, getMockHelixManager(config));
@@ -310,19 +324,17 @@ public class YarnServiceTest {
       return helixManager;
     }
 
-    protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
+    protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
         throws IOException {
       return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), 
Collections.emptyMap(),
-              Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, 
Collections.emptyMap());
+          Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, 
Collections.emptyMap());
     }
 
     /**
      * Get the list of matching container requests for the specified resource 
memory and cores.
      */
-    public List<? extends Collection<AMRMClient.ContainerRequest>> 
getMatchingRequestsList(int memory, int cores) {
-      Resource resource = Resource.newInstance(memory, cores);
+    public List<? extends Collection<AMRMClient.ContainerRequest>> 
getMatchingRequestsList(Resource resource) {
       Priority priority = Priority.newInstance(0);
-
       return getAmrmClientAsync().getMatchingRequests(priority, 
ResourceRequest.ANY, resource);
     }
 
@@ -346,14 +358,13 @@ public class YarnServiceTest {
           Thread.currentThread().interrupt();
           break;
         }
-
+        ConcurrentMap<ContainerId, ContainerInfo> containerMap = 
getContainerMap();
         if (expectedCount == getContainerMap().size()) {
           success = true;
           break;
         }
       }
-
       return success;
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index 1934ecebb..1041cc31b 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -33,7 +33,6 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -198,10 +197,11 @@ public class YarnServiceTestWithExpiration {
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"})
   public void testStartError() throws Exception{
-    this.expiredYarnService.requestTargetNumberOfContainers(10, 
Collections.EMPTY_SET);
+    Resource resource = Resource.newInstance(16, 1);
+    this.expiredYarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), 
Collections.EMPTY_SET);
 
-    Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64, 
1).isEmpty());
-    Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(), 
10);
+    
Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(resource).isEmpty());
 
     
AssertWithBackoff.create().logger(LOG).timeoutMs(60000).maxSleepMs(2000).backoffFactor(1.5)
         .assertTrue(new Predicate<Void>() {
@@ -234,7 +234,7 @@ public class YarnServiceTestWithExpiration {
       completedContainers.add(containerStatus);
     }
 
-    protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
+    protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
         throws IOException {
       try {
         Thread.sleep(1000);
@@ -251,4 +251,4 @@ public class YarnServiceTestWithExpiration {
       }
     }
   }
-}
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf 
b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
index d2dc49c46..73ecf85cd 100644
--- a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -17,6 +17,7 @@
 
 # Yarn/Helix configuration properties
 gobblin.cluster.helix.cluster.name=YarnServiceTest
+gobblin.cluster.helixInstanceTags=GobblinKafkaStreaming
 gobblin.yarn.app.name=YarnServiceTest
 gobblin.yarn.work.dir=YarnServiceTest
 
@@ -30,7 +31,7 @@ gobblin.yarn.app.master.cores=1
 gobblin.yarn.app.report.interval.minutes=1
 gobblin.yarn.max.get.app.report.failures=4
 gobblin.yarn.email.notification.on.shutdown=false
-gobblin.yarn.initial.containers=1
+gobblin.yarn.initial.containers=0
 gobblin.yarn.container.memory.mbs=64
 gobblin.yarn.container.cores=1
 gobblin.yarn.container.affinity.enabled=true

Reply via email to