This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 56930d7 [GOBBLIN-762] Add automatic scaling for Gobblin on YARN
56930d7 is described below
commit 56930d78947a23baf6884839baee36aea33486e9
Author: Hung Tran <[email protected]>
AuthorDate: Tue May 14 09:06:14 2019 -0700
[GOBBLIN-762] Add automatic scaling for Gobblin on YARN
Closes #2626 from htran1/yarn_auto_scale
---
.../gobblin/cluster/GobblinClusterManager.java | 1 +
gobblin-yarn/build.gradle | 5 +
.../gobblin/yarn/GobblinApplicationMaster.java | 22 +-
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 3 +
.../gobblin/yarn/YarnAutoScalingManager.java | 187 +++++++++++
.../java/org/apache/gobblin/yarn/YarnService.java | 126 +++++++-
.../yarn/event/ContainerReleaseRequest.java | 46 +++
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 357 +++++++++++++++++++++
.../org/apache/gobblin/yarn/YarnServiceTest.java | 296 +++++++++++++++++
.../src/test/resources/YarnServiceTest.conf | 72 +++++
10 files changed, 1109 insertions(+), 6 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index a4658cc..571e5ba 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -136,6 +136,7 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
private JobConfigurationManager jobConfigurationManager;
private final String clusterName;
+ @Getter
protected final Config config;
public GobblinClusterManager(String clusterName, String applicationId,
Config config,
diff --git a/gobblin-yarn/build.gradle b/gobblin-yarn/build.gradle
index 02959bf..4d28eec 100644
--- a/gobblin-yarn/build.gradle
+++ b/gobblin-yarn/build.gradle
@@ -65,6 +65,10 @@ dependencies {
testCompile externalDependency.hadoopYarnMiniCluster
testCompile externalDependency.curatorFramework
testCompile externalDependency.curatorTest
+
+ testCompile ('com.google.inject:guice:3.0') {
+ force = true
+ }
}
task testJar(type: Jar, dependsOn: testClasses) {
@@ -124,6 +128,7 @@ test {
dependsOn shadowJar
workingDir rootProject.rootDir
maxParallelForks = 1
+ forkEvery = 1
}
clean {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index c4ee07a..f0a7b48 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -44,15 +44,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.util.logs.Log4jConfigurationHelper;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
@@ -68,9 +73,11 @@ import
org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
*/
@Alpha
public class GobblinApplicationMaster extends GobblinClusterManager {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinApplicationMaster.class);
+ @Getter
+ private final YarnService yarnService;
+
public GobblinApplicationMaster(String applicationName, ContainerId
containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
super(applicationName,
containerId.getApplicationAttemptId().getApplicationId().toString(),
@@ -82,13 +89,22 @@ public class GobblinApplicationMaster extends
GobblinClusterManager {
.addService(gobblinYarnLogSource.buildLogCopier(this.config,
containerId, this.fs, this.appWorkDir));
}
- this.applicationLauncher
- .addService(buildYarnService(this.config, applicationName,
this.applicationId, yarnConfiguration, this.fs));
+ this.yarnService = buildYarnService(this.config, applicationName,
this.applicationId, yarnConfiguration, this.fs);
+ this.applicationLauncher.addService(this.yarnService);
if (UserGroupInformation.isSecurityEnabled()) {
LOGGER.info("Adding YarnContainerSecurityManager since security is
enabled");
this.applicationLauncher.addService(buildYarnContainerSecurityManager(this.config,
this.fs));
}
+
+ // Add additional services
+ List<String> serviceClassNames = ConfigUtils.getStringList(this.config,
+ GobblinYarnConfigurationKeys.APP_MASTER_SERVICE_CLASSES);
+
+ for (String serviceClassName : serviceClassNames) {
+ Class<?> serviceClass = Class.forName(serviceClassName);
+ this.applicationLauncher.addService((Service)
GobblinConstructorUtils.invokeLongestConstructor(serviceClass, this));
+ }
}
/**
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index e0b8e31..6098eae 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -33,6 +33,8 @@ public class GobblinYarnConfigurationKeys {
public static final String MAX_GET_APP_REPORT_FAILURES_KEY =
GOBBLIN_YARN_PREFIX + "max.get.app.report.failures";
public static final String EMAIL_NOTIFICATION_ON_SHUTDOWN_KEY =
GOBBLIN_YARN_PREFIX + "email.notification.on.shutdown";
+ public static final String RELEASED_CONTAINERS_CACHE_EXPIRY_SECS =
GOBBLIN_YARN_PREFIX + "releasedContainersCacheExpirySecs";
+ public static final int DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS = 300;
// Gobblin Yarn ApplicationMaster configuration properties.
public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX +
"app.master.memory.mbs";
@@ -42,6 +44,7 @@ public class GobblinYarnConfigurationKeys {
public static final String APP_MASTER_FILES_REMOTE_KEY = GOBBLIN_YARN_PREFIX
+ "app.master.files.remote";
public static final String APP_MASTER_WORK_DIR_NAME = "appmaster";
public static final String APP_MASTER_JVM_ARGS_KEY = GOBBLIN_YARN_PREFIX +
"app.master.jvm.args";
+ public static final String APP_MASTER_SERVICE_CLASSES = GOBBLIN_YARN_PREFIX
+ "app.master.serviceClasses";
// Gobblin Yarn container configuration properties.
public static final String INITIAL_CONTAINERS_KEY = GOBBLIN_YARN_PREFIX +
"initial.containers";
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
new file mode 100644
index 0000000..e9d934b
--- /dev/null
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * The autoscaling manager is responsible for figuring out how many containers
are required for the workload and
+ * requesting the {@link YarnService} to request that many containers.
+ */
+@Slf4j
+public class YarnAutoScalingManager extends AbstractIdleService {
+ private final String AUTO_SCALING_PREFIX =
GobblinYarnConfigurationKeys.GOBBLIN_YARN_PREFIX + "autoScaling.";
+ private final String AUTO_SCALING_POLLING_INTERVAL_SECS =
+ AUTO_SCALING_PREFIX + "pollingIntervalSeconds";
+ private final int DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS = 60;
+ // Only one container will be requested for each N partitions of work
+ private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER =
AUTO_SCALING_PREFIX + "partitionsPerContainer";
+ private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
+ private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX +
"minContainers";
+ private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 0;
+ private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX +
"maxContainers";
+ private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+
+ private final Config config;
+ private final HelixManager helixManager;
+ private final ScheduledExecutorService autoScalingExecutor;
+ private final YarnService yarnService;
+ private final int partitionsPerContainer;
+ private final int minContainers;
+ private final int maxContainers;
+
+ public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
+ this.config = appMaster.getConfig();
+ this.helixManager =
appMaster.getMultiManager().getJobClusterHelixManager();
+ this.yarnService = appMaster.getYarnService();
+ this.partitionsPerContainer = ConfigUtils.getInt(this.config,
AUTO_SCALING_PARTITIONS_PER_CONTAINER,
+ DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER);
+
+ Preconditions.checkArgument(this.partitionsPerContainer > 0,
+ AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
+
+ this.minContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MIN_CONTAINERS,
+ DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
+ Preconditions.checkArgument(this.minContainers >= 0,
+ DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than or
equal to 0");
+
+ this.maxContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MAX_CONTAINERS,
+ DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
+
+ Preconditions.checkArgument(this.maxContainers > 0,
+ DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
+
+ Preconditions.checkArgument(this.maxContainers >= this.minContainers,
+ DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or
equal to "
+ + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
+ this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("AutoScalingExecutor")));
+ }
+
+ @Override
+ protected void startUp() throws Exception {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
AUTO_SCALING_POLLING_INTERVAL_SECS,
+ DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS);
+ log.info("Starting the " + YarnAutoScalingManager.class.getSimpleName());
+ log.info("Scheduling the auto scaling task with an interval of {}
seconds", scheduleInterval);
+
+ this.autoScalingExecutor.scheduleAtFixedRate(new
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
+ this.yarnService, this.partitionsPerContainer, this.minContainers,
this.maxContainers), 0,
+ scheduleInterval, TimeUnit.SECONDS);
+ }
+
+ @Override
+ protected void shutDown() throws Exception {
+ log.info("Stopping the " + YarnAutoScalingManager.class.getSimpleName());
+
+ ExecutorsUtils.shutdownExecutorService(this.autoScalingExecutor,
Optional.of(log));
+ }
+
+ /**
+ * A {@link Runnable} that figures out the number of containers required for
the workload
+ * and requests those containers.
+ */
+ @VisibleForTesting
+ @AllArgsConstructor
+ static class YarnAutoScalingRunnable implements Runnable {
+ private final TaskDriver taskDriver;
+ private final YarnService yarnService;
+ private final int partitionsPerContainer;
+ private final int minContainers;
+ private final int maxContainers;
+
+ /**
+ * Iterate through the workflows configured in Helix to figure out the
number of required partitions
+ * and request the {@link YarnService} to scale to the desired number of
containers.
+ */
+ @Override
+ public void run() {
+ Set<String> inUseInstances = new HashSet<>();
+
+ int numPartitions = 0;
+ for (Map.Entry<String, WorkflowConfig> workFlowEntry :
taskDriver.getWorkflows().entrySet()) {
+ WorkflowContext workflowContext =
taskDriver.getWorkflowContext(workFlowEntry.getKey());
+
+ // Only allocate for active workflows
+ if (workflowContext == null ||
!workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
+ continue;
+ }
+
+ log.debug("Workflow name {} config {} context {}",
workFlowEntry.getKey(), workFlowEntry.getValue(),
+ workflowContext);
+
+ WorkflowConfig workflowConfig = workFlowEntry.getValue();
+ JobDag jobDag = workflowConfig.getJobDag();
+
+ Set<String> jobs = jobDag.getAllNodes();
+
+ // sum up the number of partitions
+ for (String jobName : jobs) {
+ JobContext jobContext = taskDriver.getJobContext(jobName);
+
+ if (jobContext != null) {
+ log.debug("JobContext {} num partitions {}", jobContext,
jobContext.getPartitionSet().size());
+
+
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
+ .filter(e -> e != null).collect(Collectors.toSet()));
+
+ numPartitions += jobContext.getPartitionSet().size();
+ }
+ }
+ }
+
+ // compute the target containers as a ceiling of number of partitions
divided by the number of containers
+ // per partition.
+ int numTargetContainers = (int) Math.ceil((double)numPartitions /
this.partitionsPerContainer);
+
+ // adjust the number of target containers based on the configured min
and max container values.
+ numTargetContainers = Math.max(this.minContainers,
Math.min(this.maxContainers, numTargetContainers));
+
+ this.yarnService.requestTargetNumberOfContainers(numTargetContainers,
inUseInstances);
+ }
+ }
+}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index d4043f1..614ac1f 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -21,13 +21,17 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -62,11 +66,14 @@ import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -79,6 +86,9 @@ import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import lombok.AccessLevel;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
@@ -92,6 +102,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.JvmUtils;
import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest;
+import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
import org.apache.gobblin.yarn.event.NewContainerRequest;
@@ -144,8 +155,17 @@ public class YarnService extends AbstractIdleService {
private final Object allContainersStopped = new Object();
// A map from container IDs to pairs of Container instances and Helix
participant IDs of the containers
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
private final ConcurrentMap<ContainerId, Map.Entry<Container, String>>
containerMap = Maps.newConcurrentMap();
+ // A cache of the containers with an outstanding container release request.
+ // This is a cache instead of a set to get the automatic cleanup in case a
container completes before the requested
+ // release.
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
+ private final Cache<ContainerId, String> releasedContainerCache;
+
// A generator for an integer ID of a Helix instance (participant)
private final AtomicInteger helixInstanceIdGenerator = new AtomicInteger(0);
@@ -159,6 +179,15 @@ public class YarnService extends AbstractIdleService {
private volatile boolean shutdownInProgress = false;
+ // The number of containers requested based on the desired target number of
containers. This is used to determine
+ // how may additional containers to request since the the currently
allocated amount may be less than this amount if we
+ // are waiting for containers to be allocated.
+ // The currently allocated amount may also be higher than this amount if
YARN returned more than the requested number
+ // of containers.
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
+ private int numRequestedContainers = 0;
+
public YarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
this.applicationName = applicationName;
@@ -198,6 +227,10 @@ public class YarnService extends AbstractIdleService {
ExecutorsUtils.newThreadFactory(Optional.of(LOGGER),
Optional.of("ContainerLaunchExecutor")));
this.tokens = getSecurityTokens();
+
+ this.releasedContainerCache =
CacheBuilder.newBuilder().expireAfterAccess(ConfigUtils.getInt(config,
+ GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
+
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS),
TimeUnit.SECONDS).build();
}
@SuppressWarnings("unused")
@@ -229,6 +262,26 @@ public class YarnService extends AbstractIdleService {
}
}
+ /**
+ * Request the Resource Manager to release the container
+ * @param containerReleaseRequest containers to release
+ */
+ @Subscribe
+ public void handleContainerReleaseRequest(ContainerReleaseRequest
containerReleaseRequest) {
+ for (Container container : containerReleaseRequest.getContainers()) {
+ LOGGER.info(String.format("Releasing container %s running on %s",
container.getId(), container.getNodeId()));
+
+ // Record that this container was explicitly released so that a new one
is not spawned to replace it
+ // Put the container id in the releasedContainerCache before releasing
it so that handleContainerCompletion()
+ // can check for the container id and skip spawning a replacement
container.
+ // Note that this is best effort since these are asynchronous operations
and a container may abort concurrently
+ // with the release call. So in some cases a replacement container may
have already been spawned before
+ // the container is put into the black list.
+ this.releasedContainerCache.put(container.getId(), "");
+ this.amrmClientAsync.releaseAssignedContainer(container.getId());
+ }
+ }
+
@Override
protected void startUp() throws Exception {
LOGGER.info("Starting the YarnService");
@@ -310,10 +363,71 @@ public class YarnService extends AbstractIdleService {
.build();
}
- private void requestInitialContainers(int containersRequested) {
- for (int i = 0; i < containersRequested; i++) {
+ /**
+ * Request an allocation of containers. If numTargetContainers is larger
than the max of current and expected number
+ * of containers then additional containers are requested.
+ *
+ * If numTargetContainers is less than the current number of allocated
containers then release free containers.
+ * Shrinking is relative to the number of currently allocated containers
since it takes time for containers
+ * to be allocated and assigned work and we want to avoid releasing a
container prematurely before it is assigned
+ * work. This means that a container may not be released even though
numTargetContainers is less than the requested
+ * number of containers. The intended usage is for the caller of this method
to make periodic calls to attempt to
+ * adjust the cluster towards the desired number of containers.
+ *
+ * @param numTargetContainers the desired number of containers
+ * @param inUseInstances a set of in use instances
+ */
+ public synchronized void requestTargetNumberOfContainers(int
numTargetContainers, Set<String> inUseInstances) {
+ LOGGER.debug("Requesting numTargetContainers {} current
numRequestedContainers {} in use instances {} map size {}",
+ numTargetContainers, this.numRequestedContainers, inUseInstances,
this.containerMap.size());
+
+ // YARN can allocate more than the requested number of containers, compute
additional allocations and deallocations
+ // based on the max of the requested and actual allocated counts
+ int numAllocatedContainers = this.containerMap.size();
+
+ // The number of allocated containers may be higher than the previously
requested amount
+ // and there may be outstanding allocation requests, so the max of both
counts is computed here
+ // and used to decide whether to allocate containers.
+ int numContainers = Math.max(numRequestedContainers,
numAllocatedContainers);
+
+ // Request additional containers if the desired count is higher than the
max of the current allocation or previously
+ // requested amount. Note that there may be in-flight or additional
allocations after numContainers has been computed
+ // so overshooting can occur, but periodic calls to this method will make
adjustments towards the target.
+ for (int i = numContainers; i < numTargetContainers; i++) {
requestContainer(Optional.<String>absent());
}
+
+ // If the total desired is lower than the currently allocated amount then
release free containers.
+ // This is based on the currently allocated amount since containers may
still be in the process of being allocated
+ // and assigned work. Resizing based on numRequestedContainers at this
point may release a container right before
+ // or soon after it is assigned work.
+ if (numTargetContainers < numAllocatedContainers) {
+ LOGGER.debug("Shrinking number of containers by {}",
(numAllocatedContainers - numTargetContainers));
+
+ List<Container> containersToRelease = new ArrayList<>();
+ int numToShutdown = numContainers - numTargetContainers;
+
+ // Look for eligible containers to release. If a container is in use
then it is not released.
+ for (Map.Entry<ContainerId, Map.Entry<Container, String>> entry :
this.containerMap.entrySet()) {
+ if (!inUseInstances.contains(entry.getValue().getValue())) {
+ containersToRelease.add(entry.getValue().getKey());
+ }
+
+ if (containersToRelease.size() == numToShutdown) {
+ break;
+ }
+ }
+
+ LOGGER.debug("Shutting down containers {}", containersToRelease);
+
+ this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
+ }
+
+ this.numRequestedContainers = numTargetContainers;
+ }
+
+ private void requestInitialContainers(int containersRequested) {
+ requestTargetNumberOfContainers(containersRequested,
Collections.EMPTY_SET);
}
private void requestContainer(Optional<String> preferredNode) {
@@ -333,7 +447,7 @@ public class YarnService extends AbstractIdleService {
new AMRMClient.ContainerRequest(capability, preferredNodes, null,
priority));
}
- private ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
+ protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
throws IOException {
Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs,
this.applicationName, this.applicationId);
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
@@ -468,6 +582,12 @@ public class YarnService extends AbstractIdleService {
containerStatus.getContainerId(), containerStatus.getDiagnostics()));
}
+ if
(this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) !=
null) {
+ LOGGER.info("Container release requested, so not spawning a replacement
for containerId {}",
+ containerStatus.getContainerId());
+ return;
+ }
+
if (this.shutdownInProgress) {
return;
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
new file mode 100644
index 0000000..151ac22
--- /dev/null
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/ContainerReleaseRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn.event;
+
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+
+/**
+ * A type of event for container release requests to be used with a {@link
com.google.common.eventbus.EventBus}.
+ * This event is different than {@link ContainerShutdownRequest} because it
releases the container through
+ * the Resource Manager, while {@link ContainerShutdownRequest} shuts down a
container through the
+ * Node Manager
+ */
+public class ContainerReleaseRequest {
+ private final Collection<Container> containers;
+
+ public ContainerReleaseRequest(Collection<Container> containers) {
+ this.containers = containers;
+ }
+
+ /**
+ * Get the IDs of the containers to release.
+ *
+ * @return the IDs of the containers to release
+ */
+ public Collection<Container> getContainers() {
+ return this.containers;
+ }
+}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
new file mode 100644
index 0000000..2a36646
--- /dev/null
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import java.io.IOException;
+
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+
+
+/**
+ * Unit tests for {@link YarnAutoScalingManager}
+ */
+@Test(groups = { "gobblin.yarn" })
+public class YarnAutoScalingManagerTest {
+ /**
+ * Test for one workflow with one job
+ */
+ @Test
+ public void testOneJob() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1, 1, 10);
+
+ runnable.run();
+
+ // 2 containers requested and one worker in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(2, ImmutableSet.of("worker1"));
+ }
+
+ /**
+ * Test for one workflow with two jobs
+ */
+ @Test
+ public void testTwoJobs() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext1 = mock(JobContext.class);
+ Mockito.when(mockJobContext1.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+ JobContext mockJobContext2 = mock(JobContext.class);
+ Mockito.when(mockJobContext2.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1, 1, 10);
+
+ runnable.run();
+
+ // 3 containers requested and 2 workers in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(3, ImmutableSet.of("worker1",
"worker2"));
+ }
+
+ /**
+ * Test for two workflows
+ */
+ @Test
+ public void testTwoWorkflows() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+ WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+ JobDag mockJobDag1 = mock(JobDag.class);
+
+ Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
+ Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+
+ WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
+
+ JobContext mockJobContext1 = mock(JobContext.class);
+ Mockito.when(mockJobContext1.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+ JobContext mockJobContext2 = mock(JobContext.class);
+ Mockito.when(mockJobContext2.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+ WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
+ JobDag mockJobDag2 = mock(JobDag.class);
+
+
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
+ Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
+
+ WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+
+ JobContext mockJobContext3 = mock(JobContext.class);
+ Mockito.when(mockJobContext3.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
+
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1, 1, 10);
+
+ runnable.run();
+
+ // 5 containers requested and 3 workers in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(5,
+ ImmutableSet.of("worker1", "worker2", "worker3"));
+ }
+
+ /**
+ * Test for two workflows with one not in progress.
+ * The partitions for the workflow that is not in progress should not be
counted.
+ */
+ @Test
+ public void testNotInProgress() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+ WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+ JobDag mockJobDag1 = mock(JobDag.class);
+
+ Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
+ Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+
+ WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
+
+ JobContext mockJobContext1 = mock(JobContext.class);
+ Mockito.when(mockJobContext1.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+ JobContext mockJobContext2 = mock(JobContext.class);
+ Mockito.when(mockJobContext2.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+ WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
+ JobDag mockJobDag2 = mock(JobDag.class);
+
+
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
+ Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
+
+ WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.COMPLETED);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+
+ JobContext mockJobContext3 = mock(JobContext.class);
+ Mockito.when(mockJobContext3.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
+
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1, 1, 10);
+
+ runnable.run();
+
+ // 3 containers requested and 2 workers in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(3,
+ ImmutableSet.of("worker1", "worker2"));
+ }
+
+ /**
+ * Test multiple partitions to one container
+ */
+ @Test
+ public void testMultiplePartitionsPerContainer() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 2, 1, 10);
+
+ runnable.run();
+
+ // 1 container requested since 2 partitions and limit is 2 partitions per
container. One worker in use.
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+ }
+
+
+ /**
+ * Test min containers
+ */
+ @Test
+ public void testMinContainers() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1, 5, 10);
+
+ runnable.run();
+
+ // 5 containers requested due to min and one worker in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(5, ImmutableSet.of("worker1"));
+ }
+
+
+ /**
+ * Test max containers
+ */
+ @Test
+ public void testMaxContainers() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1, 1, 1);
+
+ runnable.run();
+
+ // 1 containers requested to max and one worker in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+ }
+}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
new file mode 100644
index 0000000..60aad14
--- /dev/null
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.base.Predicate;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link YarnService}.
+ */
+@Test(groups = {"gobblin.yarn", "disabledOnTravis"}, singleThreaded=true)
+public class YarnServiceTest {
+ final Logger LOG = LoggerFactory.getLogger(YarnServiceTest.class);
+
+ private YarnClient yarnClient;
+ private MiniYARNCluster yarnCluster;
+ private TestYarnService yarnService;
+ private Config config;
+ private YarnConfiguration clusterConf;
+ private ApplicationId applicationId;
+ private ApplicationAttemptId applicationAttemptId;
+ private final EventBus eventBus = new EventBus("YarnServiceTest");
+
+ private final Closer closer = Closer.create();
+
+ private static void setEnv(String key, String value) {
+ try {
+ Map<String, String> env = System.getenv();
+ Class<?> cl = env.getClass();
+ Field field = cl.getDeclaredField("m");
+ field.setAccessible(true);
+ Map<String, String> writableEnv = (Map<String, String>) field.get(env);
+ writableEnv.put(key, value);
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to set environment variable", e);
+ }
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // Set java home in environment since it isn't set on some systems
+ String javaHome = System.getProperty("java.home");
+ setEnv("JAVA_HOME", javaHome);
+
+ this.clusterConf = new YarnConfiguration();
+ this.clusterConf.set(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, "100");
+
this.clusterConf.set(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
"10000");
+
this.clusterConf.set(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
"60000");
+
+ this.yarnCluster =
+ this.closer.register(new MiniYARNCluster("YarnServiceTestCluster", 4,
1,
+ 1));
+ this.yarnCluster.init(this.clusterConf);
+ this.yarnCluster.start();
+
+ // YARN client should not be started before the Resource Manager is up
+ AssertWithBackoff.create().logger(LOG).timeoutMs(10000)
+ .assertTrue(new Predicate<Void>() {
+ @Override public boolean apply(Void input) {
+ return
!clusterConf.get(YarnConfiguration.RM_ADDRESS).contains(":0");
+ }
+ }, "Waiting for RM");
+
+ this.yarnClient = this.closer.register(YarnClient.createYarnClient());
+ this.yarnClient.init(this.clusterConf);
+ this.yarnClient.start();
+
+ URL url = YarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+
+ this.config = ConfigFactory.parseURL(url).resolve();
+
+ // Start a dummy application manager so that the YarnService can use the
AM-RM token.
+ startApp();
+
+ // create and start the test yarn service
+ this.yarnService = new TestYarnService(this.config, "testApp", "appId",
+ this.clusterConf,
+ FileSystem.getLocal(new Configuration()), this.eventBus);
+
+ this.yarnService.startUp();
+ }
+
+ private void startApp() throws Exception {
+ // submit a dummy app
+ ApplicationSubmissionContext appSubmissionContext =
+ yarnClient.createApplication().getApplicationSubmissionContext();
+ this.applicationId = appSubmissionContext.getApplicationId();
+
+ ContainerLaunchContext containerLaunchContext =
+ BuilderUtils.newContainerLaunchContext(Collections.emptyMap(),
Collections.emptyMap(),
+ Arrays.asList("sleep", "100"), Collections.emptyMap(), null,
Collections.emptyMap());
+
+ // Setup the application submission context
+ appSubmissionContext.setApplicationName("TestApp");
+ appSubmissionContext.setResource(Resource.newInstance(128, 1));
+ appSubmissionContext.setPriority(Priority.newInstance(0));
+ appSubmissionContext.setAMContainerSpec(containerLaunchContext);
+
+ this.yarnClient.submitApplication(appSubmissionContext);
+
+ // wait for application to be accepted
+ int i;
+ RMAppAttempt attempt = null;
+ for (i = 0; i < 120; i++) {
+ ApplicationReport appReport =
yarnClient.getApplicationReport(applicationId);
+
+ if (appReport.getYarnApplicationState() ==
YarnApplicationState.ACCEPTED) {
+ this.applicationAttemptId = appReport.getCurrentApplicationAttemptId();
+ attempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+
.get(appReport.getCurrentApplicationAttemptId().getApplicationId()).getCurrentAppAttempt();
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue(i < 120, "timed out waiting for ACCEPTED state");
+
+ // Set the AM-RM token in the UGI for access during testing
+
UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser(UserGroupInformation.getCurrentUser()
+ .getUserName()));
+ UserGroupInformation.getCurrentUser().addToken(attempt.getAMRMToken());
+ }
+
+ @AfterClass
+ public void tearDown() throws IOException, TimeoutException, YarnException {
+ try {
+
this.yarnClient.killApplication(this.applicationAttemptId.getApplicationId());
+ this.yarnService.shutDown();
+ } finally {
+ this.closer.close();
+ }
+ }
+
+ /**
+ * Test that the dynamic config is added to the config specified when the
{@link GobblinApplicationMaster}
+ * is instantiated.
+ */
+ @Test(groups = {"gobblin.yarn", "disabledOnTravis"})
+ public void testScaleUp() {
+ this.yarnService.requestTargetNumberOfContainers(10,
Collections.EMPTY_SET);
+
+ Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
+ Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
+ }
+
+ @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods =
"testScaleUp")
+ public void testScaleDownWithInUseInstances() {
+ Set<String> inUseInstances = new HashSet<>();
+
+ for (int i = 1; i <= 8; i++) {
+ inUseInstances.add("GobblinYarnTaskRunner_" + i);
+ }
+
+ this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
+
+ Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
+
+ // will only be able to shrink to 8
+ Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
+
+ // will not be able to shrink to 6 due to 8 in-use instances
+ Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
+
+ }
+
+ @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods =
"testScaleDownWithInUseInstances")
+ public void testScaleDown() throws Exception {
+ this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
+
+ Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
+ Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
+ }
+
+ // Keep this test last since it interferes with the container counts in the
prior tests.
+ @Test(groups = {"gobblin.yarn", "disabledOnTravis"}, dependsOnMethods =
"testScaleDown")
+ public void testReleasedContainerCache() throws Exception {
+ Config modifiedConfig = this.config
+
.withValue(GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
ConfigValueFactory.fromAnyRef("2"));
+ TestYarnService yarnService =
+ new TestYarnService(modifiedConfig, "testApp1", "appId1",
+ this.clusterConf, FileSystem.getLocal(new Configuration()),
this.eventBus);
+
+ ContainerId containerId1 =
ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(1,
0),
+ 0), 0);
+
+ yarnService.getReleasedContainerCache().put(containerId1, "");
+
+
Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1)
!= null);
+
+ // give some time for element to expire
+ Thread.sleep(4000);
+
Assert.assertTrue(yarnService.getReleasedContainerCache().getIfPresent(containerId1)
== null);
+ }
+
+
+ private static class TestYarnService extends YarnService {
+ public TestYarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
+ FileSystem fs, EventBus eventBus) throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+ }
+
+ protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
+ throws IOException {
+ return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(),
Collections.emptyMap(),
+ Arrays.asList("sleep", "60000"), Collections.emptyMap(), null,
Collections.emptyMap());
+ }
+
+ /**
+ * Wait to reach the expected count.
+ *
+ * @param expectedCount the expected count
+ * @param waitMillis amount of time in milliseconds to wait
+ * @return true if the count was reached within the allowed wait time
+ */
+ public boolean waitForContainerCount(int expectedCount, int waitMillis) {
+ final int waitInterval = 1000;
+ int waitedMillis = 0;
+ boolean success = false;
+
+ while (waitedMillis < waitMillis) {
+ try {
+ Thread.sleep(waitInterval);
+ waitedMillis += waitInterval;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+
+ if (expectedCount == getContainerMap().size()) {
+ success = true;
+ break;
+ }
+ }
+
+ return success;
+ }
+ }
+}
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
new file mode 100644
index 0000000..d2dc49c
--- /dev/null
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Yarn/Helix configuration properties
+gobblin.cluster.helix.cluster.name=YarnServiceTest
+gobblin.yarn.app.name=YarnServiceTest
+gobblin.yarn.work.dir=YarnServiceTest
+
+gobblin.yarn.lib.jars.dir="build/gobblin-yarn/libs"
+gobblin.yarn.conf.dir="gobblin-yarn/src/test/resources"
+gobblin.yarn.app.master.files.local=${gobblin.yarn.conf.dir}"/log4j-yarn.properties,"${gobblin.yarn.conf.dir}"/application.conf,yarn-site.xml,dynamic.conf"
+gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local}
+gobblin.yarn.app.queue=default
+gobblin.yarn.app.master.memory.mbs=64
+gobblin.yarn.app.master.cores=1
+gobblin.yarn.app.report.interval.minutes=1
+gobblin.yarn.max.get.app.report.failures=4
+gobblin.yarn.email.notification.on.shutdown=false
+gobblin.yarn.initial.containers=1
+gobblin.yarn.container.memory.mbs=64
+gobblin.yarn.container.cores=1
+gobblin.yarn.container.affinity.enabled=true
+gobblin.yarn.helix.instance.max.retries=2
+gobblin.yarn.logs.sink.root.dir=${gobblin.yarn.work.dir}/applogs
+
+# File system URIs
+fs.uri="file:///"
+writer.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+
+# Writer related configuration properties
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.staging.dir=${gobblin.yarn.work.dir}/task-staging
+writer.output.dir=${gobblin.yarn.work.dir}/task-output
+
+# Data publisher related configuration properties
+data.publisher.type=org.apache.gobblin.publisher.BaseDataPublisher
+data.publisher.final.dir=${gobblin.yarn.work.dir}/job-output
+data.publisher.replace.final.dir=false
+
+# Directory where job/task state files are stored
+state.store.dir=${gobblin.yarn.work.dir}/state-store
+
+# Directory where error files from the quality checkers are stored
+qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
+
+# Disable job locking for now
+job.lock.enabled=false
+
+# Interval of task state reporting in milliseconds
+task.status.reportintervalinms=1000
+
+# If the job execution history server should be enabled
+job.execinfo.server.enabled=false
+
+# Enable metrics / events
+metrics.enabled=false
\ No newline at end of file