This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 56930d7  [GOBBLIN-762] Add automatic scaling for Gobblin on YARN
56930d7 is described below

commit 56930d78947a23baf6884839baee36aea33486e9
Author: Hung Tran <[email protected]>
AuthorDate: Tue May 14 09:06:14 2019 -0700

    [GOBBLIN-762] Add automatic scaling for Gobblin on YARN
    
    Closes #2626 from htran1/yarn_auto_scale
---
 .../gobblin/cluster/GobblinClusterManager.java     |   1 +
 gobblin-yarn/build.gradle                          |   5 +
 .../gobblin/yarn/GobblinApplicationMaster.java     |  22 +-
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |   3 +
 .../gobblin/yarn/YarnAutoScalingManager.java       | 187 +++++++++++
 .../java/org/apache/gobblin/yarn/YarnService.java  | 126 +++++++-
 .../yarn/event/ContainerReleaseRequest.java        |  46 +++
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 357 +++++++++++++++++++++
 .../org/apache/gobblin/yarn/YarnServiceTest.java   | 296 +++++++++++++++++
 .../src/test/resources/YarnServiceTest.conf        |  72 +++++
 10 files changed, 1109 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index a4658cc..571e5ba 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -136,6 +136,7 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   private JobConfigurationManager jobConfigurationManager;
 
   private final String clusterName;
+  @Getter
   protected final Config config;
 
   public GobblinClusterManager(String clusterName, String applicationId, 
Config config,
diff --git a/gobblin-yarn/build.gradle b/gobblin-yarn/build.gradle
index 02959bf..4d28eec 100644
--- a/gobblin-yarn/build.gradle
+++ b/gobblin-yarn/build.gradle
@@ -65,6 +65,10 @@ dependencies {
   testCompile externalDependency.hadoopYarnMiniCluster
   testCompile externalDependency.curatorFramework
   testCompile externalDependency.curatorTest
+
+  testCompile ('com.google.inject:guice:3.0') {
+    force = true
+  }
 }
 
 task testJar(type: Jar, dependsOn: testClasses) {
@@ -124,6 +128,7 @@ test {
   dependsOn shadowJar
   workingDir rootProject.rootDir
   maxParallelForks = 1
+  forkEvery = 1
 }
 
 clean {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index c4ee07a..f0a7b48 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -44,15 +44,20 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterManager;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 
 
@@ -68,9 +73,11 @@ import 
org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
  */
 @Alpha
 public class GobblinApplicationMaster extends GobblinClusterManager {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinApplicationMaster.class);
 
+  @Getter
+  private final YarnService yarnService;
+
   public GobblinApplicationMaster(String applicationName, ContainerId 
containerId, Config config,
       YarnConfiguration yarnConfiguration) throws Exception {
     super(applicationName, 
containerId.getApplicationAttemptId().getApplicationId().toString(),
@@ -82,13 +89,22 @@ public class GobblinApplicationMaster extends 
GobblinClusterManager {
           .addService(gobblinYarnLogSource.buildLogCopier(this.config, 
containerId, this.fs, this.appWorkDir));
     }
 
-    this.applicationLauncher
-        .addService(buildYarnService(this.config, applicationName, 
this.applicationId, yarnConfiguration, this.fs));
+    this.yarnService = buildYarnService(this.config, applicationName, 
this.applicationId, yarnConfiguration, this.fs);
+    this.applicationLauncher.addService(this.yarnService);
 
     if (UserGroupInformation.isSecurityEnabled()) {
       LOGGER.info("Adding YarnContainerSecurityManager since security is 
enabled");
       
this.applicationLauncher.addService(buildYarnContainerSecurityManager(this.config,
 this.fs));
     }
+
+    // Add additional services
+    List<String> serviceClassNames = ConfigUtils.getStringList(this.config,
+        GobblinYarnConfigurationKeys.APP_MASTER_SERVICE_CLASSES);
+
+    for (String serviceClassName : serviceClassNames) {
+      Class<?> serviceClass = Class.forName(serviceClassName);
+      this.applicationLauncher.addService((Service) 
GobblinConstructorUtils.invokeLongestConstructor(serviceClass, this));
+    }
   }
 
   /**
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index e0b8e31..6098eae 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -33,6 +33,8 @@ public class GobblinYarnConfigurationKeys {
   public static final String MAX_GET_APP_REPORT_FAILURES_KEY = 
GOBBLIN_YARN_PREFIX + "max.get.app.report.failures";
   public static final String EMAIL_NOTIFICATION_ON_SHUTDOWN_KEY =
       GOBBLIN_YARN_PREFIX + "email.notification.on.shutdown";
+  public static final String RELEASED_CONTAINERS_CACHE_EXPIRY_SECS = 
GOBBLIN_YARN_PREFIX + "releasedContainersCacheExpirySecs";
+  public static final int DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS = 300;
 
   // Gobblin Yarn ApplicationMaster configuration properties.
   public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX + 
"app.master.memory.mbs";
@@ -42,6 +44,7 @@ public class GobblinYarnConfigurationKeys {
   public static final String APP_MASTER_FILES_REMOTE_KEY = GOBBLIN_YARN_PREFIX 
+ "app.master.files.remote";
   public static final String APP_MASTER_WORK_DIR_NAME = "appmaster";
   public static final String APP_MASTER_JVM_ARGS_KEY = GOBBLIN_YARN_PREFIX + 
"app.master.jvm.args";
+  public static final String APP_MASTER_SERVICE_CLASSES = GOBBLIN_YARN_PREFIX 
+ "app.master.serviceClasses";
 
   // Gobblin Yarn container configuration properties.
   public static final String INITIAL_CONTAINERS_KEY = GOBBLIN_YARN_PREFIX + 
"initial.containers";
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
new file mode 100644
index 0000000..e9d934b
--- /dev/null
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * The autoscaling manager is responsible for figuring out how many containers 
are required for the workload and
+ * requesting the {@link YarnService} to request that many containers.
+ */
+@Slf4j
+public class YarnAutoScalingManager extends AbstractIdleService {
+  private final String AUTO_SCALING_PREFIX = 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
+  private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
+      AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
+  private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
+  // Only one container will be requested for each N partitions of work
+  private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = 
AUTO_SCALING_PREFIX + "partitionsPerContainer";
+  private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
+  private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + 
"minContainers";
+  private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 0;
+  private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + 
"maxContainers";
+  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+
+  private final Config config;
+  private final HelixManager helixManager;
+  private final ScheduledExecutorService autoScalingExecutor;
+  private final YarnService yarnService;
+  private final int partitionsPerContainer;
+  private final int minContainers;
+  private final int maxContainers;
+
+  public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
+    this.config = appMaster.getConfig();
+    this.helixManager = 
appMaster.getMultiManager().getJobClusterHelixManager();
+    this.yarnService = appMaster.getYarnService();
+    this.partitionsPerContainer = ConfigUtils.getInt(this.config, 
AUTO_SCALING_PARTITIONS_PER_CONTAINER,
+        DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER);
+
+    Preconditions.checkArgument(this.partitionsPerContainer > 0,
+        AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
+
+    this.minContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MIN_CONTAINERS,
+        DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
+    Preconditions.checkArgument(this.minContainers >= 0,
+        DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than or 
equal to 0");
+
+    this.maxContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MAX_CONTAINERS,
+        DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
+
+    Preconditions.checkArgument(this.maxContainers > 0,
+        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
+
+    Preconditions.checkArgument(this.maxContainers >= this.minContainers,
+        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or 
equal to "
+            + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
+    this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("AutoScalingExecutor")));
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
AUTO_SCALING_POLLING_INTERVAL_SECS,
+        DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS);
+    log.info("Starting the " + YarnAutoScalingManager.class.getSimpleName());
+    log.info("Scheduling the auto scaling task with an interval of {} 
seconds", scheduleInterval);
+
+    this.autoScalingExecutor.scheduleAtFixedRate(new 
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
+            this.yarnService, this.partitionsPerContainer, this.minContainers, 
this.maxContainers), 0,
+        scheduleInterval, TimeUnit.SECONDS);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    log.info("Stopping the " + YarnAutoScalingManager.class.getSimpleName());
+
+    ExecutorsUtils.shutdownExecutorService(this.autoScalingExecutor, 
Optional.of(log));
+  }
+
+  /**
+   * A {@link Runnable} that figures out the number of containers required for 
the workload
+   * and requests those containers.
+   */
+  @VisibleForTesting
+  @AllArgsConstructor
+  static class YarnAutoScalingRunnable implements Runnable {
+    private final TaskDriver taskDriver;
+    private final YarnService yarnService;
+    private final int partitionsPerContainer;
+    private final int minContainers;
+    private final int maxContainers;
+
+    /**
+     * Iterate through the workflows configured in Helix to figure out the 
number of required partitions
+     * and request the {@link YarnService} to scale to the desired number of 
containers.
+     */
+    @Override
+    public void run() {
+      Set<String> inUseInstances = new HashSet<>();
+
+      int numPartitions = 0;
+      for (Map.Entry<String, WorkflowConfig> workFlowEntry : 
taskDriver.getWorkflows().entrySet()) {
+        WorkflowContext workflowContext = 
taskDriver.getWorkflowContext(workFlowEntry.getKey());
+
+        // Only allocate for active workflows
+        if (workflowContext == null || 
!workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
+          continue;
+        }
+
+        log.debug("Workflow name {} config {} context {}", 
workFlowEntry.getKey(), workFlowEntry.getValue(),
+            workflowContext);
+
+        WorkflowConfig workflowConfig = workFlowEntry.getValue();
+        JobDag jobDag = workflowConfig.getJobDag();
+
+        Set<String> jobs = jobDag.getAllNodes();
+
+        // sum up the number of partitions
+        for (String jobName : jobs) {
+          JobContext jobContext = taskDriver.getJobContext(jobName);
+
+          if (jobContext != null) {
+            log.debug("JobContext {} num partitions {}", jobContext, 
jobContext.getPartitionSet().size());
+
+            
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
+                .filter(e -> e != null).collect(Collectors.toSet()));
+
+            numPartitions += jobContext.getPartitionSet().size();
+          }
+        }
+      }
+
+      // compute the target containers as a ceiling of number of partitions 
divided by the number of containers
+      // per partition.
+      int numTargetContainers = (int) Math.ceil((double)numPartitions / 
this.partitionsPerContainer);
+
+      // adjust the number of target containers based on the configured min 
and max container values.
+      numTargetContainers = Math.max(this.minContainers, 
Math.min(this.maxContainers, numTargetContainers));
+
+      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, 
inUseInstances);
+    }
+  }
+}
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index d4043f1..614ac1f 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -21,13 +21,17 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -62,11 +66,14 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -79,6 +86,9 @@ import com.google.common.util.concurrent.AbstractIdleService;
 
 import com.typesafe.config.Config;
 
+import lombok.AccessLevel;
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
@@ -92,6 +102,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
 import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
 import org.apache.gobblin.yarn.event.NewContainerRequest;
 
@@ -144,8 +155,17 @@ public class YarnService extends AbstractIdleService {
   private final Object allContainersStopped = new Object();
 
   // A map from container IDs to pairs of Container instances and Helix 
participant IDs of the containers
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
   private final ConcurrentMap<ContainerId, Map.Entry<Container, String>> 
containerMap = Maps.newConcurrentMap();
 
+  // A cache of the containers with an outstanding container release request.
+  // This is a cache instead of a set to get the automatic cleanup in case a 
container completes before the requested
+  // release.
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
+  private final Cache<ContainerId, String> releasedContainerCache;
+
   // A generator for an integer ID of a Helix instance (participant)
   private final AtomicInteger helixInstanceIdGenerator = new AtomicInteger(0);
 
@@ -159,6 +179,15 @@ public class YarnService extends AbstractIdleService {
 
   private volatile boolean shutdownInProgress = false;
 
+  // The number of containers requested based on the desired target number of 
containers. This is used to determine
+  // how may additional containers to request since the the currently 
allocated amount may be less than this amount if we
+  // are waiting for containers to be allocated.
+  // The currently allocated amount may also be higher than this amount if 
YARN returned more than the requested number
+  // of containers.
+  @VisibleForTesting
+  @Getter(AccessLevel.PROTECTED)
+  private int numRequestedContainers = 0;
+
   public YarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
       FileSystem fs, EventBus eventBus) throws Exception {
     this.applicationName = applicationName;
@@ -198,6 +227,10 @@ public class YarnService extends AbstractIdleService {
         ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), 
Optional.of("ContainerLaunchExecutor")));
 
     this.tokens = getSecurityTokens();
+
+    this.releasedContainerCache = 
CacheBuilder.newBuilder().expireAfterAccess(ConfigUtils.getInt(config,
+        GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
+        
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), 
TimeUnit.SECONDS).build();
   }
 
   @SuppressWarnings("unused")
@@ -229,6 +262,26 @@ public class YarnService extends AbstractIdleService {
     }
   }
 
+  /**
+   * Request the Resource Manager to release the container
+   * @param containerReleaseRequest containers to release
+   */
+  @Subscribe
+  public void handleContainerReleaseRequest(ContainerReleaseRequest 
containerReleaseRequest) {
+    for (Container container : containerReleaseRequest.getContainers()) {
+      LOGGER.info(String.format("Releasing container %s running on %s", 
container.getId(), container.getNodeId()));
+
+      // Record that this container was explicitly released so that a new one 
is not spawned to replace it
+      // Put the container id in the releasedContainerCache before releasing 
it so that handleContainerCompletion()
+      // can check for the container id and skip spawning a replacement 
container.
+      // Note that this is best effort since these are asynchronous operations 
and a container may abort concurrently
+      // with the release call. So in some cases a replacement container may 
have already been spawned before
+      // the container is put into the black list.
+      this.releasedContainerCache.put(container.getId(), "");
+      this.amrmClientAsync.releaseAssignedContainer(container.getId());
+    }
+  }
+
   @Override
   protected void startUp() throws Exception {
     LOGGER.info("Starting the YarnService");
@@ -310,10 +363,71 @@ public class YarnService extends AbstractIdleService {
         .build();
   }
 
-  private void requestInitialContainers(int containersRequested) {
-    for (int i = 0; i < containersRequested; i++) {
+  /**
+   * Request an allocation of containers. If numTargetContainers is larger 
than the max of current and expected number
+   * of containers then additional containers are requested.
+   *
+   * If numTargetContainers is less than the current number of allocated 
containers then release free containers.
+   * Shrinking is relative to the number of currently allocated containers 
since it takes time for containers
+   * to be allocated and assigned work and we want to avoid releasing a 
container prematurely before it is assigned
+   * work. This means that a container may not be released even though 
numTargetContainers is less than the requested
+   * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
+   * adjust the cluster towards the desired number of containers.
+   *
+   * @param numTargetContainers the desired number of containers
+   * @param inUseInstances  a set of in use instances
+   */
+  public synchronized void requestTargetNumberOfContainers(int 
numTargetContainers, Set<String> inUseInstances) {
+    LOGGER.debug("Requesting numTargetContainers {} current 
numRequestedContainers {} in use instances {} map size {}",
+        numTargetContainers, this.numRequestedContainers, inUseInstances, 
this.containerMap.size());
+
+    // YARN can allocate more than the requested number of containers, compute 
additional allocations and deallocations
+    // based on the max of the requested and actual allocated counts
+    int numAllocatedContainers = this.containerMap.size();
+
+    // The number of allocated containers may be higher than the previously 
requested amount
+    // and there may be outstanding allocation requests, so the max of both 
counts is computed here
+    // and used to decide whether to allocate containers.
+    int numContainers = Math.max(numRequestedContainers, 
numAllocatedContainers);
+
+    // Request additional containers if the desired count is higher than the 
max of the current allocation or previously
+    // requested amount. Note that there may be in-flight or additional 
allocations after numContainers has been computed
+    // so overshooting can occur, but periodic calls to this method will make 
adjustments towards the target.
+    for (int i = numContainers; i < numTargetContainers; i++) {
       requestContainer(Optional.<String>absent());
     }
+
+    // If the total desired is lower than the currently allocated amount then 
release free containers.
+    // This is based on the currently allocated amount since containers may 
still be in the process of being allocated
+    // and assigned work. Resizing based on numRequestedContainers at this 
point may release a container right before
+    // or soon after it is assigned work.
+    if (numTargetContainers < numAllocatedContainers) {
+      LOGGER.debug("Shrinking number of containers by {}", 
(numAllocatedContainers - numTargetContainers));
+
+      List<Container> containersToRelease = new ArrayList<>();
+      int numToShutdown = numContainers - numTargetContainers;
+
+      // Look for eligible containers to release. If a container is in use 
then it is not released.
+      for (Map.Entry<ContainerId, Map.Entry<Container, String>> entry : 
this.containerMap.entrySet()) {
+        if (!inUseInstances.contains(entry.getValue().getValue())) {
+          containersToRelease.add(entry.getValue().getKey());
+        }
+
+        if (containersToRelease.size() == numToShutdown) {
+          break;
+        }
+      }
+
+      LOGGER.debug("Shutting down containers {}", containersToRelease);
+
+      this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
+    }
+
+    this.numRequestedContainers = numTargetContainers;
+  }
+
+  private void requestInitialContainers(int containersRequested) {
+    requestTargetNumberOfContainers(containersRequested, 
Collections.EMPTY_SET);
   }
 
   private void requestContainer(Optional<String> preferredNode) {
@@ -333,7 +447,7 @@ public class YarnService extends AbstractIdleService {
         new AMRMClient.ContainerRequest(capability, preferredNodes, null, 
priority));
   }
 
-  private ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
+  protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
       throws IOException {
     Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs, 
this.applicationName, this.applicationId);
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
@@ -468,6 +582,12 @@ public class YarnService extends AbstractIdleService {
           containerStatus.getContainerId(), containerStatus.getDiagnostics()));
     }
 
+    if 
(this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != 
null) {
+      LOGGER.info("Container release requested, so not spawning a replacement 
for containerId {}",
+          containerStatus.getContainerId());
+      return;
+    }
+
     if (this.shutdownInProgress) {
       return;
     }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
new file mode 100644
index 0000000..151ac22
--- /dev/null
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.event;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+
+/**
+ * A type of event for container release requests to be used with a {@link 
com.google.common.eventbus.EventBus}.
+ * This event is different than {@link ContainerShutdownRequest} because it 
releases the container through
+ * the Resource Manager, while {@link ContainerShutdownRequest} shuts down a 
container through the
+ * Node Manager
+ */
+public class ContainerReleaseRequest {
+  private final Collection<Container> containers;
+
+  public ContainerReleaseRequest(Collection<Container> containers) {
+    this.containers = containers;
+  }
+
+  /**
+   * Get the IDs of the containers to release.
+   *
+   * @return the IDs of the containers to release
+   */
+  public Collection<Container> getContainers() {
+    return this.containers;
+  }
+}
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
new file mode 100644
index 0000000..2a36646
--- /dev/null
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+
+/**
+ * Unit tests for {@link YarnAutoScalingManager}
+ */
+@Test(groups = { "gobblin.yarn" })
+public class YarnAutoScalingManagerTest {
+  /**
+   * Test for one workflow with one job
+   */
+  @Test
+  public void testOneJob() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1, 1, 10);
+
+    runnable.run();
+
+    // 2 containers requested and one worker in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(2, ImmutableSet.of("worker1"));
+  }
+
+  /**
+   * Test for one workflow with two jobs
+   */
+  @Test
+  public void testTwoJobs() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext1 = mock(JobContext.class);
+    Mockito.when(mockJobContext1.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+    JobContext mockJobContext2 = mock(JobContext.class);
+    Mockito.when(mockJobContext2.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1, 1, 10);
+
+    runnable.run();
+
+    // 3 containers requested and 2 workers in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(3, ImmutableSet.of("worker1", 
"worker2"));
+  }
+
+  /**
+   * Test for two workflows
+   */
+  @Test
+  public void testTwoWorkflows() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+    JobDag mockJobDag1 = mock(JobDag.class);
+
+    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
+    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+
+    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
+
+    JobContext mockJobContext1 = mock(JobContext.class);
+    Mockito.when(mockJobContext1.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+    JobContext mockJobContext2 = mock(JobContext.class);
+    Mockito.when(mockJobContext2.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
+    JobDag mockJobDag2 = mock(JobDag.class);
+
+    
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
+    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
+
+    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+
+    JobContext mockJobContext3 = mock(JobContext.class);
+    Mockito.when(mockJobContext3.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
+    
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1, 1, 10);
+
+    runnable.run();
+
+    // 5 containers requested and 3 workers in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(5,
+        ImmutableSet.of("worker1", "worker2", "worker3"));
+  }
+
+  /**
+   * Test for two workflows with one not in progress.
+   * The partitions for the workflow that is not in progress should not be 
counted.
+   */
+  @Test
+  public void testNotInProgress() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+    JobDag mockJobDag1 = mock(JobDag.class);
+
+    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
+    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+
+    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
+
+    JobContext mockJobContext1 = mock(JobContext.class);
+    Mockito.when(mockJobContext1.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+    JobContext mockJobContext2 = mock(JobContext.class);
+    Mockito.when(mockJobContext2.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
+    JobDag mockJobDag2 = mock(JobDag.class);
+
+    
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
+    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
+
+    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.COMPLETED);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+
+    JobContext mockJobContext3 = mock(JobContext.class);
+    Mockito.when(mockJobContext3.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
+    
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1, 1, 10);
+
+    runnable.run();
+
+    // 3 containers requested and 2 workers in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(3,
+        ImmutableSet.of("worker1", "worker2"));
+  }
+
+  /**
+   * Test multiple partitions to one container
+   */
+  @Test
+  public void testMultiplePartitionsPerContainer() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 2, 1, 10);
+
+    runnable.run();
+
+    // 1 container requested since 2 partitions and limit is 2 partitions per 
container. One worker in use.
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+  }
+
+
+  /**
+   * Test min containers
+   */
+  @Test
+  public void testMinContainers() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1, 5, 10);
+
+    runnable.run();
+
+    // 5 containers requested due to min and one worker in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(5, ImmutableSet.of("worker1"));
+  }
+
+
+  /**
+   * Test max containers
+   */
+  @Test
+  public void testMaxContainers() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1, 1, 1);
+
+    runnable.run();
+
+    // 1 containers requested  to max and one worker in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+  }
+}
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
new file mode 100644
index 0000000..60aad14
--- /dev/null
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.base.Predicate;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@Test(groups = {"gobblin.yarn", "disabledOnTravis"}, singleThreaded=true)
+public class YarnServiceTest {
+  final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+
+  private YarnClient yarnClient;
+  private MiniYARNCluster yarnCluster;
+  private TestYarnService yarnService;
+  private Config config;
+  private YarnConfiguration clusterConf;
+  private ApplicationId applicationId;
+  private ApplicationAttemptId applicationAttemptId;
+  private final EventBus eventBus = new EventBus("YarnServiceTest");
+
+  private final Closer closer = Closer.create();
+
+  private static void setEnv(String key, String value) {
+    try {
+      Map<String, String> env = System.getenv();
+      Class<?> cl = env.getClass();
+      Field field = cl.getDeclaredField("m");
+      field.setAccessible(true);
+      Map<String, String> writableEnv = (Map<String, String>) field.get(env);
+      writableEnv.put(key, value);
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to set environment variable", e);
+    }
+  }
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    // Set java home in environment since it isn't set on some systems
+    String javaHome = System.getProperty("java.home");
+    setEnv("JAVA_HOME", javaHome);
+
+    this.clusterConf = new YarnConfiguration();
+    this.clusterConf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "100");
+    
this.clusterConf.set(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 
"10000");
+    
this.clusterConf.set(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
 "60000");
+
+    this.yarnCluster =
+        this.closer.register(new MiniYARNCluster("YarnServiceTestCluster", 4, 
1,
+            1));
+    this.yarnCluster.init(this.clusterConf);
+    this.yarnCluster.start();
+
+    // YARN client should not be started before the Resource Manager is up
+    AssertWithBackoff.create().logger(LOG).timeoutMs(10000)
+        .assertTrue(new Predicate<Void>() {
+          @Override public boolean apply(Void input) {
+            return 
!clusterConf.get(YarnConfiguration.RM_ADDRESS).contains(":0");
+          }
+        }, "Waiting for RM");
+
+    this.yarnClient = this.closer.register(YarnClient.createYarnClient());
+    this.yarnClient.init(this.clusterConf);
+    this.yarnClient.start();
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.config = ConfigFactory.parseURL(url).resolve();
+
+    // Start a dummy application manager so that the YarnService can use the 
AM-RM token.
+    startApp();
+
+    // create and start the test yarn service
+    this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+        this.clusterConf,
+        FileSystem.getLocal(new Configuration()), this.eventBus);
+
+   this.yarnService.startUp();
+  }
+
+  private void startApp() throws Exception {
+    // submit a dummy app
+    ApplicationSubmissionContext appSubmissionContext =
+        yarnClient.createApplication().getApplicationSubmissionContext();
+    this.applicationId = appSubmissionContext.getApplicationId();
+
+    ContainerLaunchContext containerLaunchContext =
+        BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), 
Collections.emptyMap(),
+            Arrays.asList("sleep", "100"), Collections.emptyMap(), null, 
Collections.emptyMap());
+
+    // Setup the application submission context
+    appSubmissionContext.setApplicationName("TestApp");
+    appSubmissionContext.setResource(Resource.newInstance(128, 1));
+    appSubmissionContext.setPriority(Priority.newInstance(0));
+    appSubmissionContext.setAMContainerSpec(containerLaunchContext);
+
+    this.yarnClient.submitApplication(appSubmissionContext);
+
+    // wait for application to be accepted
+    int i;
+    RMAppAttempt attempt = null;
+    for (i = 0; i < 120; i++) {
+      ApplicationReport appReport = 
yarnClient.getApplicationReport(applicationId);
+
+      if (appReport.getYarnApplicationState() == 
YarnApplicationState.ACCEPTED) {
+        this.applicationAttemptId = appReport.getCurrentApplicationAttemptId();
+        attempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+            
.get(appReport.getCurrentApplicationAttemptId().getApplicationId()).getCurrentAppAttempt();
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    Assert.assertTrue(i < 120, "timed out waiting for ACCEPTED state");
+
+    // Set the AM-RM token in the UGI for access during testing
+    
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser()
+        .getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(attempt.getAMRMToken());
+  }
+
+  @AfterClass
+  public void tearDown() throws IOException, TimeoutException, YarnException {
+    try {
+      
this.yarnClient.killApplication(this.applicationAttemptId.getApplicationId());
+      this.yarnService.shutDown();
+    } finally {
+      this.closer.close();
+    }
+  }
+
+  /**
+   * Test that the dynamic config is added to the config specified when the 
{@link GobblinApplicationMaster}
+   * is instantiated.
+   */
+  @Test(groups = {"gobblin.yarn", "disabledOnTravis"})
+  public void testScaleUp() {
+    this.yarnService.requestTargetNumberOfContainers(10, 
Collections.EMPTY_SET);
+
+    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
+    Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
+  }
+
+  @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = 
"testScaleUp")
+  public void testScaleDownWithInUseInstances() {
+    Set<String> inUseInstances = new HashSet<>();
+
+    for (int i = 1; i <= 8; i++) {
+      inUseInstances.add("GobblinYarnTaskRunner_" + i);
+    }
+
+    this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
+
+    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
+
+    // will only be able to shrink to 8
+    Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
+
+    // will not be able to shrink to 6 due to 8 in-use instances
+    Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
+
+  }
+
+  @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = 
"testScaleDownWithInUseInstances")
+  public void testScaleDown() throws Exception {
+    this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
+
+    Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
+    Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
+  }
+
+  // Keep this test last since it interferes with the container counts in the 
prior tests.
+  @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods = 
"testScaleDown")
+  public void testReleasedContainerCache() throws Exception {
+    Config modifiedConfig = this.config
+        
.withValue(GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, 
ConfigValueFactory.fromAnyRef("2"));
+    TestYarnService yarnService =
+        new TestYarnService(modifiedConfig, "testApp1", "appId1",
+            this.clusterConf, FileSystem.getLocal(new Configuration()), 
this.eventBus);
+
+    ContainerId containerId1 = 
ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1,
 0),
+        0), 0);
+
+    yarnService.getReleasedContainerCache().put(containerId1, "");
+
+    
Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1)
 != null);
+
+    // give some time for element to expire
+    Thread.sleep(4000);
+    
Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1)
 == null);
+  }
+
+
+  private static class TestYarnService extends YarnService {
+    public TestYarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
+        FileSystem fs, EventBus eventBus) throws Exception {
+      super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+    }
+
+    protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
+        throws IOException {
+      return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(), 
Collections.emptyMap(),
+              Arrays.asList("sleep", "60000"), Collections.emptyMap(), null, 
Collections.emptyMap());
+    }
+
+    /**
+     * Wait to reach the expected count.
+     *
+     * @param expectedCount the expected count
+     * @param waitMillis amount of time in milliseconds to wait
+     * @return true if the count was reached within the allowed wait time
+     */
+    public boolean waitForContainerCount(int expectedCount, int waitMillis) {
+      final int waitInterval = 1000;
+      int waitedMillis = 0;
+      boolean success = false;
+
+      while (waitedMillis < waitMillis) {
+        try {
+          Thread.sleep(waitInterval);
+          waitedMillis += waitInterval;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          break;
+        }
+
+        if (expectedCount == getContainerMap().size()) {
+          success = true;
+          break;
+        }
+      }
+
+      return success;
+    }
+  }
+}
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf 
b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
new file mode 100644
index 0000000..d2dc49c
--- /dev/null
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Yarn/Helix configuration properties
+gobblin.cluster.helix.cluster.name=YarnServiceTest
+gobblin.yarn.app.name=YarnServiceTest
+gobblin.yarn.work.dir=YarnServiceTest
+
+gobblin.yarn.lib.jars.dir="build/gobblin-yarn/libs"
+gobblin.yarn.conf.dir="gobblin-yarn/src/test/resources"
+gobblin.yarn.app.master.files.local=${gobblin.yarn.conf.dir}"/log4j-yarn.properties,"${gobblin.yarn.conf.dir}"/application.conf,yarn-site.xml,dynamic.conf"
+gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local}
+gobblin.yarn.app.queue=default
+gobblin.yarn.app.master.memory.mbs=64
+gobblin.yarn.app.master.cores=1
+gobblin.yarn.app.report.interval.minutes=1
+gobblin.yarn.max.get.app.report.failures=4
+gobblin.yarn.email.notification.on.shutdown=false
+gobblin.yarn.initial.containers=1
+gobblin.yarn.container.memory.mbs=64
+gobblin.yarn.container.cores=1
+gobblin.yarn.container.affinity.enabled=true
+gobblin.yarn.helix.instance.max.retries=2
+gobblin.yarn.logs.sink.root.dir=${gobblin.yarn.work.dir}/applogs
+
+# File system URIs
+fs.uri="file:///"
+writer.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+
+# Writer related configuration properties
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.staging.dir=${gobblin.yarn.work.dir}/task-staging
+writer.output.dir=${gobblin.yarn.work.dir}/task-output
+
+# Data publisher related configuration properties
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+data.publisher.final.dir=${gobblin.yarn.work.dir}/job-output
+data.publisher.replace.final.dir=false
+
+# Directory where job/task state files are stored
+state.store.dir=${gobblin.yarn.work.dir}/state-store
+
+# Directory where error files from the quality checkers are stored
+qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
+
+# Disable job locking for now
+job.lock.enabled=false
+
+# Interval of task state reporting in milliseconds
+task.status.reportintervalinms=1000
+
+# If the job execution history server should be enabled
+job.execinfo.server.enabled=false
+
+# Enable metrics / events
+metrics.enabled=false
\ No newline at end of file

Reply via email to