This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 288b9e271 [GOBBLIN-1646] Revert yarn container / helix tag group
changes (#3507)
288b9e271 is described below
commit 288b9e271ac561745c3b7e49c381c75ea9f7c84b
Author: Matthew Ho <[email protected]>
AuthorDate: Thu May 12 15:13:06 2022 -0700
[GOBBLIN-1646] Revert yarn container / helix tag group changes (#3507)
Revert "Fix bug when shrinking the container in Yarn service (#3504)"
This reverts commit dd6d910a7e7a90d15258c6c77ebe626ae6d573f9.
Revert "[GOBBLIN-1620]Make yarn container allocation group by helix tag
(#3487)"
This reverts commit 3e877951c284ccd68be3634522f9fc2c3d39f81a.
---
.../cluster/GobblinClusterConfigurationKeys.java | 8 -
.../gobblin/cluster/GobblinHelixJobLauncher.java | 14 -
.../apache/gobblin/cluster/GobblinTaskRunner.java | 25 +-
.../gobblin/yarn/YarnAutoScalingManager.java | 120 ++++----
.../gobblin/yarn/YarnContainerRequestBundle.java | 76 -----
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 25 --
.../java/org/apache/gobblin/yarn/YarnService.java | 189 +++++-------
.../gobblin/yarn/event/NewContainerRequest.java | 10 -
.../apache/gobblin/yarn/GobblinYarnTestUtils.java | 7 -
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 338 +++++++++------------
.../org/apache/gobblin/yarn/YarnServiceTest.java | 51 ++--
.../yarn/YarnServiceTestWithExpiration.java | 10 +-
.../src/test/resources/YarnServiceTest.conf | 3 +-
13 files changed, 314 insertions(+), 562 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 3454b4d48..8c9513f50 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -84,7 +84,6 @@ public class GobblinClusterConfigurationKeys {
public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX +
"helixJobTag";
public static final String HELIX_PLANNING_JOB_TAG_KEY =
GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX
+ "helixInstanceTags";
- public static final String HELIX_DEFAULT_TAG = "GobblinHelixDefaultTag";
// Helix job quota
public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX +
"helixJobType";
@@ -185,13 +184,6 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED =
GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
public static final boolean
DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;
- // Config to specify the resource requirement for each Gobblin job run, so
that helix tasks within this job will
- // be assigned to containers with desired resource. This config need to
cooperate with helix job tag, so that helix
- // cluster knows how to distribute tasks to correct containers.
- public static final String HELIX_JOB_CONTAINER_MEMORY_MBS =
GOBBLIN_CLUSTER_PREFIX + "job.container.memory.mbs";
- public static final String HELIX_JOB_CONTAINER_CORES =
GOBBLIN_CLUSTER_PREFIX + "job.container.cores";
-
-
//Config to enable/disable reuse of existing Helix Cluster
public static final String HELIX_CLUSTER_OVERWRITE_KEY =
GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index e7e305ad5..df15ee385 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -413,20 +413,6 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
- Map<String, String> jobConfigMap = new HashMap<>();
- if
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS))
{
-
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
-
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
- log.info("Job {} has specific memory requirement:{}, add this config to
command config map",
- this.jobContext.getJobId(),
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
- }
- if
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES))
{
-
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
-
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
- log.info("Job {} has specific Vcore requirement:{}, add this config to
command config map",
- this.jobContext.getJobId(),
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
- }
- jobConfigBuilder.setJobCommandConfigMap(jobConfigMap);
return jobConfigBuilder;
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 4878d1031..cad583908 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -24,11 +24,9 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -542,25 +540,16 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
* the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance
with EXAMPLE_INSTANCE_TAG was found.
*/
private void addInstanceTags() {
+ List<String> tags = ConfigUtils.getStringList(this.clusterConfig,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
HelixManager receiverManager = getReceiverManager();
if (receiverManager.isConnected()) {
- // The helix instance associated with this container should be
consistent on helix tag
- List<String> existedTags = receiverManager.getClusterManagmentTool()
- .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags();
- Set<String> desiredTags = new HashSet<>(
- ConfigUtils.getStringList(this.clusterConfig,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
- if (!desiredTags.isEmpty()) {
- // Remove tag assignments for the current Helix instance from a
previous run
- for (String tag : existedTags) {
- if (!desiredTags.contains(tag))
-
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName,
this.helixInstanceName, tag);
- logger.info("Removed unrelated helix tag {} for instance {}", tag,
this.helixInstanceName);
- }
- desiredTags.forEach(desiredTag ->
receiverManager.getClusterManagmentTool()
- .addInstanceTag(this.clusterName, this.helixInstanceName,
desiredTag));
+ if (!tags.isEmpty()) {
+ logger.info("Adding tags binding " + tags);
+ tags.forEach(tag -> receiverManager.getClusterManagmentTool()
+ .addInstanceTag(this.clusterName, this.helixInstanceName, tag));
+ logger.info("Actual tags binding " +
receiverManager.getClusterManagmentTool()
+ .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags());
}
- logger.info("Actual tags binding " +
receiverManager.getClusterManagmentTool()
- .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags());
}
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 7c4da8fd8..458527abc 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,13 +17,11 @@
package org.apache.gobblin.yarn;
-import com.google.common.base.Strings;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -31,12 +29,9 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
-import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
@@ -72,13 +67,14 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
// Only one container will be requested for each N partitions of work
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER =
AUTO_SCALING_PREFIX + "partitionsPerContainer";
private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
+ private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX +
"minContainers";
+ private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
+ private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX +
"maxContainers";
private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR =
AUTO_SCALING_PREFIX + "overProvisionFactor";
private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR =
1.0;
- // The cluster level default tags for Helix instances
- private final String defaultHelixInstanceTags;
- private final int defaultContainerMemoryMbs;
- private final int defaultContainerCores;
+ // A rough value of how much containers should be an intolerable number.
+ private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX +
"initialDelay";
private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
@@ -91,6 +87,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final ScheduledExecutorService autoScalingExecutor;
private final YarnService yarnService;
private final int partitionsPerContainer;
+ private final int minContainers;
+ private final int maxContainers;
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingFixedSizeWindow;
private static int maxIdleTimeInMinutesBeforeScalingDown =
DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
@@ -105,20 +103,31 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
Preconditions.checkArgument(this.partitionsPerContainer > 0,
AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
+ this.minContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MIN_CONTAINERS,
+ DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
+ Preconditions.checkArgument(this.minContainers > 0,
+ DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");
+
+ this.maxContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MAX_CONTAINERS,
+ DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
+
this.overProvisionFactor = ConfigUtils.getDouble(this.config,
AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
+ Preconditions.checkArgument(this.maxContainers > 0,
+ DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
+
+ Preconditions.checkArgument(this.maxContainers >= this.minContainers,
+ DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or
equal to "
+ + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+
this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
- ? new SlidingWindowReservoir(config.getInt(AUTO_SCALING_WINDOW_SIZE),
Integer.MAX_VALUE)
- : new SlidingWindowReservoir(Integer.MAX_VALUE);
+ ? new SlidingWindowReservoir(maxContainers,
config.getInt(AUTO_SCALING_WINDOW_SIZE))
+ : new SlidingWindowReservoir(maxContainers);
this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("AutoScalingExecutor")));
-
- this.defaultHelixInstanceTags = ConfigUtils.getString(config,
- GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
- this.defaultContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
- this.defaultContainerCores =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
}
@Override
@@ -131,10 +140,9 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
log.info("Scheduling the auto scaling task with an interval of {}
seconds", scheduleInterval);
this.autoScalingExecutor.scheduleAtFixedRate(new
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
- this.yarnService, this.partitionsPerContainer,
this.overProvisionFactor,
- this.slidingFixedSizeWindow,
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
- this.defaultContainerMemoryMbs, this.defaultContainerCores),
- initialDelay, scheduleInterval, TimeUnit.SECONDS);
+ this.yarnService, this.partitionsPerContainer, this.minContainers,
this.maxContainers, this.overProvisionFactor,
+ this.slidingFixedSizeWindow,
this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
+ TimeUnit.SECONDS);
}
@Override
@@ -154,13 +162,11 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final TaskDriver taskDriver;
private final YarnService yarnService;
private final int partitionsPerContainer;
+ private final int minContainers;
+ private final int maxContainers;
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingWindowReservoir;
private final HelixDataAccessor helixDataAccessor;
- private final String defaultHelixInstanceTags;
- private final int defaultContainerMemoryMbs;
- private final int defaultContainerCores;
-
/**
* A static map that keep track of an idle instance and its latest
beginning idle time.
* If an instance is no longer idle when inspected, it will be dropped
from this map.
@@ -196,7 +202,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
@VisibleForTesting
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
- YarnContainerRequestBundle yarnContainerRequestBundle = new
YarnContainerRequestBundle();
+
+ int numPartitions = 0;
for (Map.Entry<String, WorkflowConfig> workFlowEntry :
taskDriver.getWorkflows().entrySet()) {
WorkflowContext workflowContext =
taskDriver.getWorkflowContext(workFlowEntry.getKey());
@@ -210,42 +217,24 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
WorkflowConfig workflowConfig = workFlowEntry.getValue();
JobDag jobDag = workflowConfig.getJobDag();
+
Set<String> jobs = jobDag.getAllNodes();
// sum up the number of partitions
for (String jobName : jobs) {
JobContext jobContext = taskDriver.getJobContext(jobName);
- JobConfig jobConfig = taskDriver.getJobConfig(jobName);
- Resource resource =
Resource.newInstance(this.defaultContainerMemoryMbs,
this.defaultContainerCores);
- int numPartitions = 0;
- String jobTag = defaultHelixInstanceTags;
+
if (jobContext != null) {
log.debug("JobContext {} num partitions {}", jobContext,
jobContext.getPartitionSet().size());
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
- .filter(Objects::nonNull).collect(Collectors.toSet()));
-
- numPartitions = jobContext.getPartitionSet().size();
- // Job level config for helix instance tags takes precedence over
other tag configurations
- if (jobConfig != null) {
- if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
- jobTag = jobConfig.getInstanceGroupTag();
- }
- Map<String, String> jobCommandConfigMap =
jobConfig.getJobCommandConfigMap();
-
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
-
resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
- }
-
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
-
resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
- }
- }
+ .filter(e -> e != null).collect(Collectors.toSet()));
+
+ numPartitions += jobContext.getPartitionSet().size();
}
- // compute the container count as a ceiling of number of partitions
divided by the number of containers
- // per partition. Scale the result by a constant overprovision
factor.
- int containerCount = (int) Math.ceil(((double)numPartitions /
this.partitionsPerContainer) * this.overProvisionFactor);
- yarnContainerRequestBundle.add(jobTag, containerCount, resource);
}
}
+
// Find all participants appearing in this cluster. Note that Helix
instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants =
getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
@@ -264,11 +253,17 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
instanceIdleSince.remove(participant);
}
}
- slidingWindowReservoir.add(yarnContainerRequestBundle);
- log.debug("There are {} containers being requested in total, tag-count
map {}, tag-resource map {}",
- yarnContainerRequestBundle.getTotalContainers(),
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
- yarnContainerRequestBundle.getHelixTagResourceMap());
+ // compute the target containers as a ceiling of number of partitions
divided by the number of containers
+ // per partition. Scale the result by a constant overprovision factor.
+ int numTargetContainers = (int) Math.ceil(((double)numPartitions /
this.partitionsPerContainer) * this.overProvisionFactor);
+
+ // adjust the number of target containers based on the configured min
and max container values.
+ numTargetContainers = Math.max(this.minContainers,
Math.min(this.maxContainers, numTargetContainers));
+
+ slidingWindowReservoir.add(numTargetContainers);
+
+ log.info("There are {} containers being requested", numTargetContainers);
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
inUseInstances);
}
@@ -295,8 +290,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
* which captures max value. It is NOT built for general purpose.
*/
static class SlidingWindowReservoir {
- private ArrayDeque<YarnContainerRequestBundle> fifoQueue;
- private PriorityQueue<YarnContainerRequestBundle> priorityQueue;
+ private ArrayDeque<Integer> fifoQueue;
+ private PriorityQueue<Integer> priorityQueue;
// Queue Size
private int maxSize;
@@ -311,11 +306,10 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
this.maxSize = maxSize;
this.upperBound = upperBound;
this.fifoQueue = new ArrayDeque<>(maxSize);
- this.priorityQueue = new PriorityQueue<>(maxSize, new
Comparator<YarnContainerRequestBundle>() {
+ this.priorityQueue = new PriorityQueue<>(maxSize, new
Comparator<Integer>() {
@Override
- public int compare(YarnContainerRequestBundle o1,
YarnContainerRequestBundle o2) {
- Integer i2 = o2.getTotalContainers();
- return i2.compareTo(o1.getTotalContainers());
+ public int compare(Integer o1, Integer o2) {
+ return o2.compareTo(o1);
}
});
}
@@ -329,14 +323,14 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
* When a new element is larger than upperbound, reject the value since we
may request too many Yarn containers.
* When queue is full, evict head of FIFO-queue (In FIFO queue, elements
are inserted from tail).
*/
- public void add(YarnContainerRequestBundle e) {
- if (e.getTotalContainers() > upperBound) {
+ public void add(int e) {
+ if (e > upperBound) {
log.error(String.format("Request of getting %s containers seems to be
excessive, rejected", e));
return;
}
if (fifoQueue.size() == maxSize) {
- YarnContainerRequestBundle removedElement = fifoQueue.remove();
+ Integer removedElement = fifoQueue.remove();
priorityQueue.remove(removedElement);
}
@@ -351,7 +345,7 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
/**
* If queue is empty, throw {@link IllegalStateException}.
*/
- public YarnContainerRequestBundle getMax() {
+ public int getMax() {
if (priorityQueue.size() > 0) {
return this.priorityQueue.peek();
} else {
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
deleted file mode 100644
index 63e6fcb23..000000000
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.yarn;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-/**
- * The class that represents current Yarn container request that will be used
by {link @YarnService}.
- * Yarn container allocation should associate with helix tag, as workflows can
have specific helix tag setup
- * and specific resource requirement.
- */
-@Slf4j
-@Getter
-public class YarnContainerRequestBundle {
- int totalContainers;
- private final Map<String, Integer> helixTagContainerCountMap;
- private final Map<String, Resource> helixTagResourceMap;
- private final Map<String, Set<String>> resourceHelixTagMap;
-
- public YarnContainerRequestBundle() {
- this.totalContainers = 0;
- this.helixTagContainerCountMap = new HashMap<>();
- this.helixTagResourceMap = new HashMap<>();
- this.resourceHelixTagMap = new HashMap<>();
- }
-
- public void add(String helixTag, int containerCount, Resource resource) {
- helixTagContainerCountMap.put(helixTag,
helixTagContainerCountMap.getOrDefault(helixTag, 0) + containerCount);
- if(helixTagResourceMap.containsKey(helixTag)) {
- Resource existedResource = helixTagResourceMap.get(helixTag);
- Preconditions.checkArgument(resource.getMemory() ==
existedResource.getMemory() &&
- resource.getVirtualCores() == existedResource.getVirtualCores(),
- "Helix tag need to have consistent resource requirement. Tag " +
helixTag
- + " has existed resource require " + existedResource.toString()
+ " and different require " + resource.toString());
- } else {
- helixTagResourceMap.put(helixTag, resource);
- Set<String> tagSet =
resourceHelixTagMap.getOrDefault(resource.toString(), new HashSet<>());
- tagSet.add(helixTag);
- resourceHelixTagMap.put(resource.toString(), tagSet);
- }
- totalContainers += containerCount;
- }
-
- // This method assumes the resource requirement for the helix tag is already
stored in the map
- public void add(String helixTag, int containerCount) {
- if (!helixTagContainerCountMap.containsKey(helixTag) &&
!helixTagResourceMap.containsKey(helixTag)) {
- log.error("Helix tag {} is not present in the request bundle yet, can't
process the request to add {} "
- + "container for it without specifying the resource requirement",
helixTag, containerCount);
- return;
- }
- helixTagContainerCountMap.put(helixTag,
helixTagContainerCountMap.get(helixTag) + containerCount);
- this.totalContainers += containerCount;
- }
-}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 18068895c..5309a13ac 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -230,28 +229,4 @@ public class YarnHelixUtils {
public static String getContainerNum(String containerId) {
return "container-" + containerId.substring(containerId.lastIndexOf("_") +
1);
}
-
- /**
- * Find the helix tag for the newly allocated container. The tag should
align with {@link YarnContainerRequestBundle},
- * so that the correct resource can be allocated to helix workflow that has
specific resource requirement.
- * @param container newly allocated container
- * @param helixTagAllocatedContainerCount current container count for each
helix tag
- * @param requestedYarnContainer yarn container request specify the desired
state
- * @return helix tag that this container should be assigned with, if null
means need to use the default
- */
- public static String findHelixTagForContainer(Container container,
- Map<String, Integer> helixTagAllocatedContainerCount,
YarnContainerRequestBundle requestedYarnContainer) {
- String foundTag = null;
- if(requestedYarnContainer != null &&
requestedYarnContainer.getResourceHelixTagMap().containsKey(container.getResource().toString()))
{
- for (String tag :
requestedYarnContainer.getResourceHelixTagMap().get(container.getResource().toString()))
{
- int desiredCount =
requestedYarnContainer.getHelixTagContainerCountMap().get(tag);
- int allocatedCount = helixTagAllocatedContainerCount.getOrDefault(tag,
0);
- foundTag = tag;
- if(allocatedCount < desiredCount) {
- return foundTag;
- }
- }
- }
- return foundTag;
- }
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index feaa564ab..9fadc9485 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -34,7 +35,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import lombok.AllArgsConstructor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -70,6 +70,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -126,7 +127,6 @@ public class YarnService extends AbstractIdleService {
private final String applicationName;
private final String applicationId;
private final String appViewAcl;
- //Default helix instance tag derived from cluster level config
private final String helixInstanceTags;
private final Config config;
@@ -158,7 +158,6 @@ public class YarnService extends AbstractIdleService {
private final String containerTimezone;
private final HelixManager helixManager;
- @Getter(AccessLevel.PROTECTED)
private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
// Security tokens for accessing HDFS
@@ -168,10 +167,10 @@ public class YarnService extends AbstractIdleService {
private final Object allContainersStopped = new Object();
- // A map from container IDs to Container instances, Helix participant IDs of
the containers and Helix Tag
+ // A map from container IDs to pairs of Container instances and Helix
participant IDs of the containers
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
- private final ConcurrentMap<ContainerId, ContainerInfo> containerMap =
Maps.newConcurrentMap();
+ private final ConcurrentMap<ContainerId, Map.Entry<Container, String>>
containerMap = Maps.newConcurrentMap();
// A cache of the containers with an outstanding container release request.
// This is a cache instead of a set to get the automatic cleanup in case a
container completes before the requested
@@ -191,17 +190,17 @@ public class YarnService extends AbstractIdleService {
// instance names get picked up when replacement containers get allocated.
private final Set<String> unusedHelixInstanceNames =
ConcurrentHashMap.newKeySet();
- // The map from helix tag to requested container count
- private final Map<String, Integer> requestedContainerCountMap =
Maps.newConcurrentMap();
- // The map from helix tag to allocated container count
- private final Map<String, Integer> allocatedContainerCountMap =
Maps.newConcurrentMap();
-
- private volatile YarnContainerRequestBundle yarnContainerRequest;
- private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
- private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
-
private volatile boolean shutdownInProgress = false;
+ // The number of containers requested based on the desired target number of
containers. This is used to determine
+ // how may additional containers to request since the the currently
allocated amount may be less than this amount if we
+ // are waiting for containers to be allocated.
+ // The currently allocated amount may also be higher than this amount if
YARN returned more than the requested number
+ // of containers.
+ @VisibleForTesting
+ @Getter(AccessLevel.PROTECTED)
+ private int numRequestedContainers = 0;
+
public YarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus, HelixManager helixManager) throws
Exception {
this.applicationName = applicationName;
@@ -237,8 +236,7 @@ public class YarnService extends AbstractIdleService {
this.containerHostAffinityEnabled =
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
this.helixInstanceMaxRetries =
config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
- this.helixInstanceTags = ConfigUtils.getString(config,
- GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+ this.helixInstanceTags = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
this.containerJvmArgs =
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
:
@@ -288,8 +286,14 @@ public class YarnService extends AbstractIdleService {
this.requestedContainerCores));
return;
}
-
requestContainer(newContainerRequest.getReplacedContainer().transform(container
-> container.getNodeId().getHost()),
- newContainerRequest.getResource());
+
+ requestContainer(newContainerRequest.getReplacedContainer().transform(new
Function<Container, String>() {
+
+ @Override
+ public String apply(Container container) {
+ return container.getNodeId().getHost();
+ }
+ }));
}
protected NMClientCallbackHandler getNMClientCallbackHandler() {
@@ -355,10 +359,10 @@ public class YarnService extends AbstractIdleService {
ExecutorsUtils.shutdownExecutorService(this.containerLaunchExecutor,
Optional.of(LOGGER));
// Stop the running containers
- for (ContainerInfo containerInfo : this.containerMap.values()) {
- LOGGER.info("Stopping container {} running participant {}",
containerInfo.getContainer().getId(),
- containerInfo.getHelixParticipantId());
-
this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(),
containerInfo.getContainer().getNodeId());
+ for (Map.Entry<Container, String> entry : this.containerMap.values()) {
+ LOGGER.info(String.format("Stopping container %s running participant
%s", entry.getKey().getId(),
+ entry.getValue()));
+ this.nmClientAsync.stopContainerAsync(entry.getKey().getId(),
entry.getKey().getNodeId());
}
if (!this.containerMap.isEmpty()) {
@@ -427,28 +431,27 @@ public class YarnService extends AbstractIdleService {
* number of containers. The intended usage is for the caller of this method
to make periodic calls to attempt to
* adjust the cluster towards the desired number of containers.
*
- * @param yarnContainerRequestBundle the desired containers information,
including numbers, resource and helix tag
+ * @param numTargetContainers the desired number of containers
* @param inUseInstances a set of in use instances
*/
- public synchronized void
requestTargetNumberOfContainers(YarnContainerRequestBundle
yarnContainerRequestBundle, Set<String> inUseInstances) {
- LOGGER.debug("Requesting numTargetContainers {}, in use instances count is
{}, container map size is {}",
- yarnContainerRequestBundle.getTotalContainers(), inUseInstances,
this.containerMap.size());
- int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
+ public synchronized void requestTargetNumberOfContainers(int
numTargetContainers, Set<String> inUseInstances) {
+ LOGGER.debug("Requesting numTargetContainers {} current
numRequestedContainers {} in use instances {} map size {}",
+ numTargetContainers, this.numRequestedContainers, inUseInstances,
this.containerMap.size());
+
// YARN can allocate more than the requested number of containers, compute
additional allocations and deallocations
// based on the max of the requested and actual allocated counts
int numAllocatedContainers = this.containerMap.size();
+ // The number of allocated containers may be higher than the previously
requested amount
+ // and there may be outstanding allocation requests, so the max of both
counts is computed here
+ // and used to decide whether to allocate containers.
+ int numContainers = Math.max(numRequestedContainers,
numAllocatedContainers);
+
// Request additional containers if the desired count is higher than the
max of the current allocation or previously
// requested amount. Note that there may be in-flight or additional
allocations after numContainers has been computed
// so overshooting can occur, but periodic calls to this method will make
adjustments towards the target.
- for (Map.Entry<String, Integer> entry :
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
- String currentHelixTag = entry.getKey();
- int desiredContainerCount = entry.getValue();
- int requestedContainerCount =
requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
- for(; requestedContainerCount < desiredContainerCount;
requestedContainerCount++) {
- requestContainer(Optional.absent(),
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
- }
- requestedContainerCountMap.put(currentHelixTag, desiredContainerCount);
+ for (int i = numContainers; i < numTargetContainers; i++) {
+ requestContainer(Optional.<String>absent());
}
// If the total desired is lower than the currently allocated amount then
release free containers.
@@ -459,13 +462,12 @@ public class YarnService extends AbstractIdleService {
LOGGER.debug("Shrinking number of containers by {}",
(numAllocatedContainers - numTargetContainers));
List<Container> containersToRelease = new ArrayList<>();
- int numToShutdown = numAllocatedContainers - numTargetContainers;
+ int numToShutdown = numContainers - numTargetContainers;
// Look for eligible containers to release. If a container is in use
then it is not released.
- for (Map.Entry<ContainerId, ContainerInfo> entry :
this.containerMap.entrySet()) {
- ContainerInfo containerInfo = entry.getValue();
- if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
- containersToRelease.add(containerInfo.getContainer());
+ for (Map.Entry<ContainerId, Map.Entry<Container, String>> entry :
this.containerMap.entrySet()) {
+ if (!inUseInstances.contains(entry.getValue().getValue())) {
+ containersToRelease.add(entry.getValue().getKey());
}
if (containersToRelease.size() == numToShutdown) {
@@ -477,47 +479,32 @@ public class YarnService extends AbstractIdleService {
this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
}
- this.yarnContainerRequest = yarnContainerRequestBundle;
- LOGGER.info("Current tag-container being requested:{}, tag-container
allocated: {}",
- this.requestedContainerCountMap, this.allocatedContainerCountMap);
- }
- // Request initial containers with default resource and helix tag
- private void requestInitialContainers(int containersRequested) {
- YarnContainerRequestBundle initialYarnContainerRequest = new
YarnContainerRequestBundle();
- Resource capability =
Resource.newInstance(this.requestedContainerMemoryMbs,
this.requestedContainerCores);
- initialYarnContainerRequest.add(this.helixInstanceTags,
containersRequested, capability);
- requestTargetNumberOfContainers(initialYarnContainerRequest,
Collections.EMPTY_SET);
+ this.numRequestedContainers = numTargetContainers;
}
- private void requestContainer(Optional<String> preferredNode,
Optional<Resource> resourceOptional) {
- Resource desiredResource = resourceOptional.or(Resource.newInstance(
- this.requestedContainerMemoryMbs, this.requestedContainerCores));
- requestContainer(preferredNode, desiredResource);
+ private void requestInitialContainers(int containersRequested) {
+ requestTargetNumberOfContainers(containersRequested,
Collections.EMPTY_SET);
}
- // Request containers with specific resource requirement
- private void requestContainer(Optional<String> preferredNode, Resource
resource) {
- // Fail if Yarn cannot meet container resource requirements
- Preconditions.checkArgument(resource.getMemory() <=
this.maxResourceCapacity.get().getMemory() &&
- resource.getVirtualCores() <=
this.maxResourceCapacity.get().getVirtualCores(),
- "Resource requirement must less than the max resource capacity.
Requested resource" + resource.toString()
- + " exceed the max resource limit " +
this.maxResourceCapacity.get().toString());
-
- // Due to YARN-314, different resource capacity needs different priority,
otherwise Yarn will not allocate container
+ private void requestContainer(Optional<String> preferredNode) {
Priority priority = Records.newRecord(Priority.class);
- if(!resourcePriorityMap.containsKey(resource.toString())) {
- resourcePriorityMap.put(resource.toString(),
priorityNumGenerator.getAndIncrement());
- }
- int priorityNum = resourcePriorityMap.get(resource.toString());
- priority.setPriority(priorityNum);
+ priority.setPriority(0);
+
+ Resource capability = Records.newRecord(Resource.class);
+ int maxMemoryCapacity = this.maxResourceCapacity.get().getMemory();
+ capability.setMemory(this.requestedContainerMemoryMbs <= maxMemoryCapacity
?
+ this.requestedContainerMemoryMbs : maxMemoryCapacity);
+ int maxCoreCapacity = this.maxResourceCapacity.get().getVirtualCores();
+ capability.setVirtualCores(this.requestedContainerCores <= maxCoreCapacity
?
+ this.requestedContainerCores : maxCoreCapacity);
String[] preferredNodes = preferredNode.isPresent() ? new String[]
{preferredNode.get()} : null;
this.amrmClientAsync.addContainerRequest(
- new AMRMClient.ContainerRequest(resource, preferredNodes, null,
priority));
+ new AMRMClient.ContainerRequest(capability, preferredNodes, null,
priority));
}
- protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
+ protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
@@ -540,7 +527,7 @@ public class YarnService extends AbstractIdleService {
ContainerLaunchContext containerLaunchContext =
Records.newRecord(ContainerLaunchContext.class);
containerLaunchContext.setLocalResources(resourceMap);
containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
+
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(container,
helixInstanceName)));
Map<ApplicationAccessType, String> acls = new HashMap<>(1);
acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -593,11 +580,11 @@ public class YarnService extends AbstractIdleService {
}
@VisibleForTesting
- protected String buildContainerCommand(ContainerInfo containerInfo) {
+ protected String buildContainerCommand(Container container, String
helixInstanceName) {
String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
StringBuilder containerCommand = new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
- .append(" -Xmx").append((int)
(containerInfo.getContainer().getResource().getMemory() *
this.jvmMemoryXmxRatio) -
+ .append(" -Xmx").append((int) (container.getResource().getMemory() *
this.jvmMemoryXmxRatio) -
this.jvmMemoryOverheadMbs).append("M")
.append("
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
@@ -609,11 +596,11 @@ public class YarnService extends AbstractIdleService {
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
.append(" ").append(this.applicationId)
.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
- .append(" ").append(containerInfo.getHelixParticipantId());
+ .append(" ").append(helixInstanceName);
- if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
+ if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
containerCommand.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
- .append(" ").append(containerInfo.getHelixTag());
+ .append(" ").append(helixInstanceTags);
}
return containerCommand.append("
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
containerProcessName).append(".").append(ApplicationConstants.STDOUT)
@@ -653,13 +640,11 @@ public class YarnService extends AbstractIdleService {
* A replacement container is needed in all but the last case.
*/
protected void handleContainerCompletion(ContainerStatus containerStatus) {
- ContainerInfo completedContainerInfo =
this.containerMap.remove(containerStatus.getContainerId());
+ Map.Entry<Container, String> completedContainerEntry =
this.containerMap.remove(containerStatus.getContainerId());
//Get the Helix instance name for the completed container. Because
callbacks are processed asynchronously, we might
//encounter situations where handleContainerCompletion() is called before
onContainersAllocated(), resulting in the
//containerId missing from the containersMap.
- String completedInstanceName = completedContainerInfo == null?
UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
- String helixTag = completedContainerInfo == null ? helixInstanceTags :
completedContainerInfo.getHelixTag();
- allocatedContainerCountMap.put(helixTag,
allocatedContainerCountMap.get(helixTag) - 1);
+ String completedInstanceName = completedContainerEntry == null?
UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
LOGGER.info(String.format("Container %s running Helix instance %s has
completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName,
containerStatus.getExitStatus()));
@@ -672,7 +657,7 @@ public class YarnService extends AbstractIdleService {
if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
if
(this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) !=
null) {
LOGGER.info("Container release requested, so not spawning a
replacement for containerId {}", containerStatus.getContainerId());
- if (completedContainerInfo != null) {
+ if (completedContainerEntry != null) {
LOGGER.info("Adding instance {} to the pool of unused instances",
completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
}
@@ -698,7 +683,7 @@ public class YarnService extends AbstractIdleService {
if (this.shutdownInProgress) {
return;
}
- if(completedContainerInfo != null) {
+ if(completedContainerEntry != null) {
this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new
AtomicInteger(0));
int retryCount =
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
@@ -730,13 +715,10 @@ public class YarnService extends AbstractIdleService {
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
eventMetadataBuilder.get().build());
}
}
- Optional<Resource> newContainerResource = completedContainerInfo != null ?
- Optional.of(completedContainerInfo.getContainer().getResource()) :
Optional.absent();
- LOGGER.info("Requesting a new container to replace {} to run Helix
instance {} with helix tag {} and resource {}",
- containerStatus.getContainerId(), completedInstanceName, helixTag,
newContainerResource.orNull());
+ LOGGER.info(String.format("Requesting a new container to replace %s to run
Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
this.eventBus.post(new NewContainerRequest(
- shouldStickToTheSameNode(containerStatus.getExitStatus()) &&
completedContainerInfo != null ?
- Optional.of(completedContainerInfo.getContainer()) :
Optional.absent(), newContainerResource));
+ shouldStickToTheSameNode(containerStatus.getExitStatus()) &&
completedContainerEntry != null ?
+ Optional.of(completedContainerEntry.getKey()) :
Optional.<Container>absent()));
}
private ImmutableMap.Builder<String, String>
buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
@@ -773,18 +755,12 @@ public class YarnService extends AbstractIdleService {
@Override
public void onContainersAllocated(List<Container> containers) {
for (final Container container : containers) {
- String containerId = container.getId().toString();
- String containerHelixTag =
YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap,
yarnContainerRequest);
- if (Strings.isNullOrEmpty(containerHelixTag)) {
- containerHelixTag = helixInstanceTags;
- }
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
- GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
+ GobblinYarnMetricTagNames.CONTAINER_ID,
container.getId().toString());
}
- LOGGER.info("Container {} has been allocated with resource {} for
helix tag {}",
- container.getId(), container.getResource(), containerHelixTag);
+ LOGGER.info(String.format("Container %s has been allocated",
container.getId()));
//Iterate over the (thread-safe) set of unused instances to find the
first instance that is not currently live.
//Once we find a candidate instance, it is removed from the set.
@@ -805,8 +781,6 @@ public class YarnService extends AbstractIdleService {
instanceName = null;
}
}
- allocatedContainerCountMap.put(containerHelixTag,
- allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) +
1);
}
if (Strings.isNullOrEmpty(instanceName)) {
@@ -815,8 +789,8 @@ public class YarnService extends AbstractIdleService {
.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX,
helixInstanceIdGenerator.incrementAndGet());
}
- ContainerInfo containerInfo = new ContainerInfo(container,
instanceName, containerHelixTag);
- containerMap.put(container.getId(), containerInfo);
+ final String finalInstanceName = instanceName;
+ containerMap.put(container.getId(), new
AbstractMap.SimpleImmutableEntry<>(container, finalInstanceName));
// Find matching requests and remove the request to reduce the chance
that a subsequent request
// will request extra containers. YARN does not have a delta request
API and the requests are not
@@ -845,11 +819,11 @@ public class YarnService extends AbstractIdleService {
@Override
public void run() {
try {
- LOGGER.info("Starting container " + containerId);
+ LOGGER.info("Starting container " + container.getId());
- nmClientAsync.startContainerAsync(container,
newContainerLaunchContext(containerInfo));
+ nmClientAsync.startContainerAsync(container,
newContainerLaunchContext(container, finalInstanceName));
} catch (IOException ioe) {
- LOGGER.error("Failed to start container " + containerId, ioe);
+ LOGGER.error("Failed to start container " + container.getId(),
ioe);
}
}
});
@@ -965,13 +939,4 @@ public class YarnService extends AbstractIdleService {
LOGGER.error(String.format("Failed to stop container %s due to error
%s", containerId, t));
}
}
-
- //A class encapsulates Container instances, Helix participant IDs of the
containers and Helix Tag
- @AllArgsConstructor
- @Getter
- static class ContainerInfo {
- private final Container container;
- private final String helixParticipantId;
- private final String helixTag;
- }
}
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
index 5cb529ed0..7ee4b3223 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
@@ -17,11 +17,9 @@
package org.apache.gobblin.yarn.event;
-import lombok.Getter;
import org.apache.hadoop.yarn.api.records.Container;
import com.google.common.base.Optional;
-import org.apache.hadoop.yarn.api.records.Resource;
/**
@@ -32,17 +30,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
public class NewContainerRequest {
private final Optional<Container> replacedContainer;
- @Getter
- private final Optional<Resource> resource;
public NewContainerRequest(Optional<Container> replacedContainer) {
this.replacedContainer = replacedContainer;
- this.resource = Optional.absent();
- }
-
- public NewContainerRequest(Optional<Container> replacedContainer,
Optional<Resource> resource) {
- this.replacedContainer = replacedContainer;
- this.resource = resource;
}
/**
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
index 21f3df0d5..9bb5b747e 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -76,10 +75,4 @@ public class GobblinYarnTestUtils {
credentials.addToken(token.getService(), token);
credentials.writeTokenStorageFile(path, new Configuration());
}
-
- public static YarnContainerRequestBundle createYarnContainerRequest(int n,
Resource resource) {
- YarnContainerRequestBundle yarnContainerRequestBundle = new
YarnContainerRequestBundle();
- yarnContainerRequestBundle.add("GobblinKafkaStreaming", n, resource);
- return yarnContainerRequestBundle;
- }
}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 003bd9ccb..6c4047147 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -19,22 +19,15 @@ package org.apache.gobblin.yarn;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
-import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -42,7 +35,6 @@ import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import static org.mockito.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -55,9 +47,6 @@ public class YarnAutoScalingManagerTest {
// A queue within size == 1 and upperBound == "infinite" should not impact
on the execution.
private final static YarnAutoScalingManager.SlidingWindowReservoir noopQueue
=
new YarnAutoScalingManager.SlidingWindowReservoir(1, Integer.MAX_VALUE);
- private final static int defaultContainerMemory = 1024;
- private final static int defaultContainerCores = 2;
- private final static String defaultHelixTag = "DefaultHelixTag";
/**
* Test for one workflow with one job
*/
@@ -93,15 +82,13 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+
// 2 containers requested and one worker in use
Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+ requestTargetNumberOfContainers(2,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
/**
@@ -144,17 +131,14 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-2", new HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
// 3 containers requested and 2 workers in use
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(3,
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
}
/**
@@ -216,17 +200,14 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-3", new HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
// 5 containers requested and 3 workers in use
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(5,
+ ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2",
"GobblinYarnTaskRunner-3"));
}
/**
@@ -288,17 +269,14 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-2", new HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
// 3 containers requested and 2 workers in use
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(3,
+ ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
}
/**
@@ -335,17 +313,103 @@ public class YarnAutoScalingManagerTest {
.thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 2,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 2, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
// 1 container requested since 2 partitions and limit is 2 partitions per
container. One worker in use.
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ }
+
+
+ /**
+ * Test min containers
+ */
+ @Test
+ public void testMinContainers() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 5, 10, 1.0, noopQueue, helixDataAccessor);
+
+ runnable.run();
+
+ // 5 containers requested due to min and one worker in use
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(5,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ }
+
+ /**
+ * Test max containers
+ */
+ @Test
+ public void testMaxContainers() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 1, 1, 1.0, noopQueue, helixDataAccessor);
+
+ runnable.run();
+
+ // 1 container requested to max and one worker in use
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
@Test
@@ -379,49 +443,41 @@ public class YarnAutoScalingManagerTest {
.thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.2, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 1, 10, 1.2, noopQueue, helixDataAccessor);
runnable1.run();
// 3 containers requested to max and one worker in use
- // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
- // so targetNumContainers = Ceil((2/1) * 1.2)) = 3.
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+ // NumPartitions = 2, Partitions per container = 1 and overprovision =
1.2, Min containers = 1, Max = 10
+ // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3.
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(3,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+
- Mockito.reset(mockYarnService);
YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 0.1, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 1, 10, 0.1, noopQueue, helixDataAccessor);
runnable2.run();
// 3 containers requested to max and one worker in use
- // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
- // so targetNumContainers = Ceil((2/1) * 0.1)) = 1.
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+ // NumPartitions = 2, Partitions per container = 1 and overprovision =
1.2, Min containers = 1, Max = 10
+ // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1.
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
- Mockito.reset(mockYarnService);
YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 6.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
+ 1, 1, 10, 6.0, noopQueue, helixDataAccessor);
runnable3.run();
// 3 containers requested to max and one worker in use
// NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
- // so targetNumContainers = Ceil((2/1) * 6.0)) = 12.
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 12);
+ // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10.
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
/**
@@ -458,43 +514,37 @@ public class YarnAutoScalingManagerTest {
.thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
TestYarnAutoScalingRunnable runnable =
- new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
helixDataAccessor);
+ new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1,
1, helixDataAccessor);
runnable.setRaiseException(true);
runnable.run();
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(0)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+ Mockito.verify(mockYarnService,
times(0)).requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
Mockito.reset(mockYarnService);
runnable.setRaiseException(false);
runnable.run();
- // 2 container requested
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+ // 1 container requested to max and one worker in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
public void testMaxValueEvictingQueue() throws Exception {
- Resource resource = Resource.newInstance(16, 1);
YarnAutoScalingManager.SlidingWindowReservoir window = new
YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
+
// Normal insertion with eviction of originally largest value
- window.add(GobblinYarnTestUtils.createYarnContainerRequest(3, resource));
- window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
- window.add(GobblinYarnTestUtils.createYarnContainerRequest(2, resource));
+ window.add(3);
+ window.add(1);
+ window.add(2);
// Now it contains [3,1,2]
- Assert.assertEquals(window.getMax().getTotalContainers(), 3);
- window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
+ Assert.assertEquals(window.getMax(), 3);
+ window.add(1);
// Now it contains [1,2,1]
- Assert.assertEquals(window.getMax().getTotalContainers(), 2);
- window.add(GobblinYarnTestUtils.createYarnContainerRequest(5, resource));
- Assert.assertEquals(window.getMax().getTotalContainers(), 5);
+ Assert.assertEquals(window.getMax(), 2);
+ window.add(5);
+ Assert.assertEquals(window.getMax(), 5);
// Now it contains [2,1,5]
- window.add(GobblinYarnTestUtils.createYarnContainerRequest(11, resource));
+ window.add(11);
// Still [2,1,5] as 11 > 10 thereby being rejected.
- Assert.assertEquals(window.getMax().getTotalContainers(), 5);
+ Assert.assertEquals(window.getMax(), 5);
}
/**
@@ -507,6 +557,7 @@ public class YarnAutoScalingManagerTest {
TaskDriver mockTaskDriver = mock(TaskDriver.class);
WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
JobDag mockJobDag = mock(JobDag.class);
+
Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
@@ -534,17 +585,14 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-2", new HelixProperty("")));
TestYarnAutoScalingRunnable runnable = new
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 1, helixDataAccessor);
+ 1, 1, 10, helixDataAccessor);
runnable.run();
// 2 containers requested and one worker in use, while the evaluation will
hold for true if not set externally,
// still tell YarnService there are two instances being used.
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+ requestTargetNumberOfContainers(2,
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
// Set failEvaluation which simulates the "beyond tolerance" case.
Mockito.reset(mockYarnService);
@@ -552,98 +600,7 @@ public class YarnAutoScalingManagerTest {
runnable.run();
Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-2")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
- }
-
- @Test
- public void testFlowsWithHelixTags() {
- YarnService mockYarnService = mock(YarnService.class);
- TaskDriver mockTaskDriver = mock(TaskDriver.class);
-
- WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
- JobDag mockJobDag1 = mock(JobDag.class);
-
- Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
- Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
- WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
- JobContext mockJobContext1 = mock(JobContext.class);
- Mockito.when(mockJobContext1.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
- JobContext mockJobContext2 = mock(JobContext.class);
- Mockito.when(mockJobContext2.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
- WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
- JobDag mockJobDag2 = mock(JobDag.class);
-
-
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
- Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
- WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
-
- JobContext mockJobContext3 = mock(JobContext.class);
- Mockito.when(mockJobContext3.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
- JobConfig mockJobConfig3 = mock(JobConfig.class);
- String helixTag = "test-Tag1";
- Map<String, String> resourceMap = new HashMap<>();
-
resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
"512");
- resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
"8");
- Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag);
-
Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap);
-
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
-
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty(""),
- "GobblinYarnTaskRunner-2", new HelixProperty(""),
- "GobblinYarnTaskRunner-3", new HelixProperty("")));
-
- YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
-
- runnable.run();
-
- // 5 containers requested and 3 workers in use
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
- Map<String, Set<String>> resourceHelixTagMap =
argument.getValue().getResourceHelixTagMap();
- Map<String, Resource> helixTagResourceMap =
argument.getValue().getHelixTagResourceMap();
- Map<String, Integer> helixTagContainerCountMap =
argument.getValue().getHelixTagContainerCountMap();
-
- // Verify that 3 containers requested with default tag and resource
setting,
- // while 2 with specific helix tag and resource requirement
- Assert.assertEquals(resourceHelixTagMap.size(), 2);
- Assert.assertEquals(helixTagResourceMap.get(helixTag),
Resource.newInstance(512, 8));
- Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag),
Resource.newInstance(defaultContainerMemory, defaultContainerCores));
- Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2);
- Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag),
3);
-
+ requestTargetNumberOfContainers(2,
ImmutableSet.of("GobblinYarnTaskRunner-2"));
}
private static class TestYarnAutoScalingRunnable extends
YarnAutoScalingManager.YarnAutoScalingRunnable {
@@ -651,9 +608,8 @@ public class YarnAutoScalingManagerTest {
boolean alwaysUnused = false;
public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService
yarnService, int partitionsPerContainer,
- HelixDataAccessor helixDataAccessor) {
- super(taskDriver, yarnService, partitionsPerContainer, 1.0,
- noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+ int minContainers, int maxContainers, HelixDataAccessor
helixDataAccessor) {
+ super(taskDriver, yarnService, partitionsPerContainer, minContainers,
maxContainers, 1.0, noopQueue, helixDataAccessor);
}
@Override
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index ac8edd16b..215e1bd03 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
@@ -208,43 +207,42 @@ public class YarnServiceTest {
*/
@Test(groups = {"gobblin.yarn", "disabledOnCI"})
public void testScaleUp() {
- Resource resource = Resource.newInstance(64, 1);
- this.yarnService.requestTargetNumberOfContainers(
- GobblinYarnTestUtils.createYarnContainerRequest(10, resource),
Collections.EMPTY_SET);
+ this.yarnService.requestTargetNumberOfContainers(10,
Collections.EMPTY_SET);
-
Assert.assertFalse(this.yarnService.getMatchingRequestsList(resource).isEmpty());
+ Assert.assertFalse(this.yarnService.getMatchingRequestsList(64,
1).isEmpty());
+ Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
- Assert.assertEquals(this.yarnService.getContainerMap().size(), 10);
+
// container request list that had entries earlier should now be empty
-
Assert.assertEquals(this.yarnService.getMatchingRequestsList(resource).size(),
0);
+ Assert.assertEquals(this.yarnService.getMatchingRequestsList(64,
1).size(), 0);
}
@Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods =
"testScaleUp")
public void testScaleDownWithInUseInstances() {
Set<String> inUseInstances = new HashSet<>();
+
for (int i = 1; i <= 8; i++) {
inUseInstances.add("GobblinYarnTaskRunner_" + i);
}
- Resource resource = Resource.newInstance(64, 1);
- this.yarnService.requestTargetNumberOfContainers(
- GobblinYarnTestUtils.createYarnContainerRequest(6, resource),
inUseInstances);
+
+ this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
+
+ Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
// will only be able to shrink to 8
Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
// will not be able to shrink to 6 due to 8 in-use instances
Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
- Assert.assertEquals(this.yarnService.getContainerMap().size(), 8);
+
}
@Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods =
"testScaleDownWithInUseInstances")
public void testScaleDown() throws Exception {
- Resource resource = Resource.newInstance(64, 1);
- this.yarnService.requestTargetNumberOfContainers(
- GobblinYarnTestUtils.createYarnContainerRequest(4, resource),
Collections.EMPTY_SET);
+ this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
+ Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
- Assert.assertEquals(this.yarnService.getContainerMap().size(), 4);
}
// Keep this test last since it interferes with the container counts in the
prior tests.
@@ -281,25 +279,13 @@ public class YarnServiceTest {
0), 0);
Resource resource = Resource.newInstance(2048, 1);
Container container = Container.newInstance(containerId, null, null,
resource, null, null);
- YarnService.ContainerInfo
- containerInfo = new YarnService.ContainerInfo(container,
"helixInstance1", "helixTag");
- String command = yarnService.buildContainerCommand(containerInfo);
+ String command = yarnService.buildContainerCommand(container,
"helixInstance1");
// 1628 is from 2048 * 0.8 - 10
Assert.assertTrue(command.contains("-Xmx1628"));
}
- /**
- * Test if requested resource exceed the resource limit, yarnService should
fail.
- */
- @Test(groups = {"gobblin.yarn", "disabledOnCI"}, expectedExceptions =
IllegalArgumentException.class)
- public void testExceedResourceLimit() {
- Resource resource = Resource.newInstance(204800, 10240);
- this.yarnService.requestTargetNumberOfContainers(
- GobblinYarnTestUtils.createYarnContainerRequest(10, resource),
Collections.EMPTY_SET);
- }
-
static class TestYarnService extends YarnService {
public TestYarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
@@ -324,7 +310,7 @@ public class YarnServiceTest {
return helixManager;
}
- protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
+ protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
throws IOException {
return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(),
Collections.emptyMap(),
Arrays.asList("sleep", "60000"), Collections.emptyMap(), null,
Collections.emptyMap());
@@ -333,8 +319,10 @@ public class YarnServiceTest {
/**
* Get the list of matching container requests for the specified resource
memory and cores.
*/
- public List<? extends Collection<AMRMClient.ContainerRequest>>
getMatchingRequestsList(Resource resource) {
+ public List<? extends Collection<AMRMClient.ContainerRequest>>
getMatchingRequestsList(int memory, int cores) {
+ Resource resource = Resource.newInstance(memory, cores);
Priority priority = Priority.newInstance(0);
+
return getAmrmClientAsync().getMatchingRequests(priority,
ResourceRequest.ANY, resource);
}
@@ -358,12 +346,13 @@ public class YarnServiceTest {
Thread.currentThread().interrupt();
break;
}
- ConcurrentMap<ContainerId, ContainerInfo> containerMap =
getContainerMap();
+
if (expectedCount == getContainerMap().size()) {
success = true;
break;
}
}
+
return success;
}
}
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index f2f2a1cfb..1934ecebb 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -33,6 +33,7 @@ import
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -197,11 +198,10 @@ public class YarnServiceTestWithExpiration {
@Test(groups = {"gobblin.yarn", "disabledOnCI"})
public void testStartError() throws Exception{
- Resource resource = Resource.newInstance(16, 1);
- this.expiredYarnService.requestTargetNumberOfContainers(
- GobblinYarnTestUtils.createYarnContainerRequest(10, resource),
Collections.EMPTY_SET);
+ this.expiredYarnService.requestTargetNumberOfContainers(10,
Collections.EMPTY_SET);
-
Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(resource).isEmpty());
+ Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64,
1).isEmpty());
+ Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(),
10);
AssertWithBackoff.create().logger(LOG).timeoutMs(60000).maxSleepMs(2000).backoffFactor(1.5)
.assertTrue(new Predicate<Void>() {
@@ -234,7 +234,7 @@ public class YarnServiceTestWithExpiration {
completedContainers.add(containerStatus);
}
- protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
+ protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
throws IOException {
try {
Thread.sleep(1000);
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
index 73ecf85cd..d2dc49c46 100644
--- a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -17,7 +17,6 @@
# Yarn/Helix configuration properties
gobblin.cluster.helix.cluster.name=YarnServiceTest
-gobblin.cluster.helixInstanceTags=GobblinKafkaStreaming
gobblin.yarn.app.name=YarnServiceTest
gobblin.yarn.work.dir=YarnServiceTest
@@ -31,7 +30,7 @@ gobblin.yarn.app.master.cores=1
gobblin.yarn.app.report.interval.minutes=1
gobblin.yarn.max.get.app.report.failures=4
gobblin.yarn.email.notification.on.shutdown=false
-gobblin.yarn.initial.containers=0
+gobblin.yarn.initial.containers=1
gobblin.yarn.container.memory.mbs=64
gobblin.yarn.container.cores=1
gobblin.yarn.container.affinity.enabled=true