This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2463eda69 Make Yarn container and helix instance allocation group by
tag (#3519)
2463eda69 is described below
commit 2463eda692b97d6947b21154abd37997a9ac3a73
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Thu Jun 9 12:27:42 2022 -0700
Make Yarn container and helix instance allocation group by tag (#3519)
---
.../cluster/GobblinClusterConfigurationKeys.java | 10 +-
.../gobblin/cluster/GobblinHelixJobLauncher.java | 16 +-
.../apache/gobblin/cluster/GobblinTaskRunner.java | 27 +-
.../gobblin/yarn/YarnAutoScalingManager.java | 122 ++++----
.../gobblin/yarn/YarnContainerRequestBundle.java | 76 +++++
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 27 +-
.../java/org/apache/gobblin/yarn/YarnService.java | 208 ++++++++-----
.../gobblin/yarn/event/NewContainerRequest.java | 12 +-
.../apache/gobblin/yarn/GobblinYarnTestUtils.java | 9 +-
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 340 ++++++++++++---------
.../org/apache/gobblin/yarn/YarnServiceTest.java | 59 ++--
.../yarn/YarnServiceTestWithExpiration.java | 12 +-
.../src/test/resources/YarnServiceTest.conf | 3 +-
13 files changed, 588 insertions(+), 333 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 8c9513f50..4ae21c1d3 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -84,6 +84,7 @@ public class GobblinClusterConfigurationKeys {
public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX +
"helixJobTag";
public static final String HELIX_PLANNING_JOB_TAG_KEY =
GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX
+ "helixInstanceTags";
+ public static final String HELIX_DEFAULT_TAG = "GobblinHelixDefaultTag";
// Helix job quota
public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX +
"helixJobType";
@@ -184,6 +185,13 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED =
GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
public static final boolean
DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;
+ // Config to specify the resource requirement for each Gobblin job run, so
that helix tasks within this job will
+ // be assigned to containers with desired resource. This config need to
cooperate with helix job tag, so that helix
+ // cluster knows how to distribute tasks to correct containers.
+ public static final String HELIX_JOB_CONTAINER_MEMORY_MBS =
GOBBLIN_CLUSTER_PREFIX + "job.container.memory.mbs";
+ public static final String HELIX_JOB_CONTAINER_CORES =
GOBBLIN_CLUSTER_PREFIX + "job.container.cores";
+
+
//Config to enable/disable reuse of existing Helix Cluster
public static final String HELIX_CLUSTER_OVERWRITE_KEY =
GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
@@ -205,4 +213,4 @@ public class GobblinClusterConfigurationKeys {
public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX +
"containerId";
public static final String GOBBLIN_CLUSTER_SYSTEM_PROPERTY_PREFIX =
GOBBLIN_CLUSTER_PREFIX + "sysProps";
-}
+}
\ No newline at end of file
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index df15ee385..933ec64ce 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -413,6 +413,20 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
+ Map<String, String> jobConfigMap = new HashMap<>();
+ if
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS))
{
+
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
+
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
+ log.info("Job {} has specific memory requirement:{}, add this config to
command config map",
+ this.jobContext.getJobId(),
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
+ }
+ if
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES))
{
+
jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
+
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
+ log.info("Job {} has specific Vcore requirement:{}, add this config to
command config map",
+ this.jobContext.getJobId(),
jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
+ }
+ jobConfigBuilder.setJobCommandConfigMap(jobConfigMap);
return jobConfigBuilder;
}
@@ -572,4 +586,4 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
this.fs.delete(jobStateFilePath, false);
}
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index cad583908..610682aa8 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -24,9 +24,11 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -540,16 +542,25 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
* the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance
with EXAMPLE_INSTANCE_TAG was found.
*/
private void addInstanceTags() {
- List<String> tags = ConfigUtils.getStringList(this.clusterConfig,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
HelixManager receiverManager = getReceiverManager();
if (receiverManager.isConnected()) {
- if (!tags.isEmpty()) {
- logger.info("Adding tags binding " + tags);
- tags.forEach(tag -> receiverManager.getClusterManagmentTool()
- .addInstanceTag(this.clusterName, this.helixInstanceName, tag));
- logger.info("Actual tags binding " +
receiverManager.getClusterManagmentTool()
- .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags());
+ // The helix instance associated with this container should be
consistent on helix tag
+ List<String> existedTags = receiverManager.getClusterManagmentTool()
+ .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags();
+ Set<String> desiredTags = new HashSet<>(
+ ConfigUtils.getStringList(this.clusterConfig,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
+ if (!desiredTags.isEmpty()) {
+ // Remove tag assignments for the current Helix instance from a
previous run
+ for (String tag : existedTags) {
+ if (!desiredTags.contains(tag))
+
receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName,
this.helixInstanceName, tag);
+ logger.info("Removed unrelated helix tag {} for instance {}", tag,
this.helixInstanceName);
+ }
+ desiredTags.forEach(desiredTag ->
receiverManager.getClusterManagmentTool()
+ .addInstanceTag(this.clusterName, this.helixInstanceName,
desiredTag));
}
+ logger.info("Actual tags binding " +
receiverManager.getClusterManagmentTool()
+ .getInstanceConfig(this.clusterName,
this.helixInstanceName).getTags());
}
}
@@ -819,4 +830,4 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
System.exit(1);
}
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 458527abc..e7ab082e2 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,11 +17,13 @@
package org.apache.gobblin.yarn;
+import com.google.common.base.Strings;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -29,9 +31,12 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
+import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
@@ -67,14 +72,13 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
// Only one container will be requested for each N partitions of work
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER =
AUTO_SCALING_PREFIX + "partitionsPerContainer";
private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
- private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX +
"minContainers";
- private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
- private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX +
"maxContainers";
private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR =
AUTO_SCALING_PREFIX + "overProvisionFactor";
private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR =
1.0;
+ // The cluster level default tags for Helix instances
+ private final String defaultHelixInstanceTags;
+ private final int defaultContainerMemoryMbs;
+ private final int defaultContainerCores;
- // A rough value of how much containers should be an intolerable number.
- private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX +
"initialDelay";
private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
@@ -87,8 +91,6 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final ScheduledExecutorService autoScalingExecutor;
private final YarnService yarnService;
private final int partitionsPerContainer;
- private final int minContainers;
- private final int maxContainers;
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingFixedSizeWindow;
private static int maxIdleTimeInMinutesBeforeScalingDown =
DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
@@ -103,31 +105,20 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
Preconditions.checkArgument(this.partitionsPerContainer > 0,
AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
- this.minContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MIN_CONTAINERS,
- DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
-
- Preconditions.checkArgument(this.minContainers > 0,
- DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");
-
- this.maxContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MAX_CONTAINERS,
- DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
-
this.overProvisionFactor = ConfigUtils.getDouble(this.config,
AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
- Preconditions.checkArgument(this.maxContainers > 0,
- DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
-
- Preconditions.checkArgument(this.maxContainers >= this.minContainers,
- DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or
equal to "
- + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
-
this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
- ? new SlidingWindowReservoir(maxContainers,
config.getInt(AUTO_SCALING_WINDOW_SIZE))
- : new SlidingWindowReservoir(maxContainers);
+ ? new SlidingWindowReservoir(config.getInt(AUTO_SCALING_WINDOW_SIZE),
Integer.MAX_VALUE)
+ : new SlidingWindowReservoir(Integer.MAX_VALUE);
this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("AutoScalingExecutor")));
+
+ this.defaultHelixInstanceTags = ConfigUtils.getString(config,
+ GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+ this.defaultContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ this.defaultContainerCores =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
}
@Override
@@ -140,9 +131,10 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
log.info("Scheduling the auto scaling task with an interval of {}
seconds", scheduleInterval);
this.autoScalingExecutor.scheduleAtFixedRate(new
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
- this.yarnService, this.partitionsPerContainer, this.minContainers,
this.maxContainers, this.overProvisionFactor,
- this.slidingFixedSizeWindow,
this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
- TimeUnit.SECONDS);
+ this.yarnService, this.partitionsPerContainer,
this.overProvisionFactor,
+ this.slidingFixedSizeWindow,
this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
+ this.defaultContainerMemoryMbs, this.defaultContainerCores),
+ initialDelay, scheduleInterval, TimeUnit.SECONDS);
}
@Override
@@ -162,11 +154,13 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final TaskDriver taskDriver;
private final YarnService yarnService;
private final int partitionsPerContainer;
- private final int minContainers;
- private final int maxContainers;
private final double overProvisionFactor;
private final SlidingWindowReservoir slidingWindowReservoir;
private final HelixDataAccessor helixDataAccessor;
+ private final String defaultHelixInstanceTags;
+ private final int defaultContainerMemoryMbs;
+ private final int defaultContainerCores;
+
/**
* A static map that keep track of an idle instance and its latest
beginning idle time.
* If an instance is no longer idle when inspected, it will be dropped
from this map.
@@ -202,8 +196,7 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
@VisibleForTesting
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
-
- int numPartitions = 0;
+ YarnContainerRequestBundle yarnContainerRequestBundle = new
YarnContainerRequestBundle();
for (Map.Entry<String, WorkflowConfig> workFlowEntry :
taskDriver.getWorkflows().entrySet()) {
WorkflowContext workflowContext =
taskDriver.getWorkflowContext(workFlowEntry.getKey());
@@ -217,24 +210,42 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
WorkflowConfig workflowConfig = workFlowEntry.getValue();
JobDag jobDag = workflowConfig.getJobDag();
-
Set<String> jobs = jobDag.getAllNodes();
// sum up the number of partitions
for (String jobName : jobs) {
JobContext jobContext = taskDriver.getJobContext(jobName);
-
+ JobConfig jobConfig = taskDriver.getJobConfig(jobName);
+ Resource resource =
Resource.newInstance(this.defaultContainerMemoryMbs,
this.defaultContainerCores);
+ int numPartitions = 0;
+ String jobTag = defaultHelixInstanceTags;
if (jobContext != null) {
log.debug("JobContext {} num partitions {}", jobContext,
jobContext.getPartitionSet().size());
inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
- .filter(e -> e != null).collect(Collectors.toSet()));
-
- numPartitions += jobContext.getPartitionSet().size();
+ .filter(Objects::nonNull).collect(Collectors.toSet()));
+
+ numPartitions = jobContext.getPartitionSet().size();
+ // Job level config for helix instance tags takes precedence over
other tag configurations
+ if (jobConfig != null) {
+ if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
+ jobTag = jobConfig.getInstanceGroupTag();
+ }
+ Map<String, String> jobCommandConfigMap =
jobConfig.getJobCommandConfigMap();
+
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
+
resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
+ }
+
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
+
resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
+ }
+ }
}
+ // compute the container count as a ceiling of number of partitions
divided by the number of containers
+ // per partition. Scale the result by a constant overprovision
factor.
+ int containerCount = (int) Math.ceil(((double)numPartitions /
this.partitionsPerContainer) * this.overProvisionFactor);
+ yarnContainerRequestBundle.add(jobTag, containerCount, resource);
}
}
-
// Find all participants appearing in this cluster. Note that Helix
instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants =
getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
@@ -253,17 +264,11 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
instanceIdleSince.remove(participant);
}
}
+ slidingWindowReservoir.add(yarnContainerRequestBundle);
- // compute the target containers as a ceiling of number of partitions
divided by the number of containers
- // per partition. Scale the result by a constant overprovision factor.
- int numTargetContainers = (int) Math.ceil(((double)numPartitions /
this.partitionsPerContainer) * this.overProvisionFactor);
-
- // adjust the number of target containers based on the configured min
and max container values.
- numTargetContainers = Math.max(this.minContainers,
Math.min(this.maxContainers, numTargetContainers));
-
- slidingWindowReservoir.add(numTargetContainers);
-
- log.info("There are {} containers being requested", numTargetContainers);
+ log.debug("There are {} containers being requested in total, tag-count
map {}, tag-resource map {}",
+ yarnContainerRequestBundle.getTotalContainers(),
yarnContainerRequestBundle.getHelixTagContainerCountMap(),
+ yarnContainerRequestBundle.getHelixTagResourceMap());
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(),
inUseInstances);
}
@@ -290,8 +295,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
* which captures max value. It is NOT built for general purpose.
*/
static class SlidingWindowReservoir {
- private ArrayDeque<Integer> fifoQueue;
- private PriorityQueue<Integer> priorityQueue;
+ private ArrayDeque<YarnContainerRequestBundle> fifoQueue;
+ private PriorityQueue<YarnContainerRequestBundle> priorityQueue;
// Queue Size
private int maxSize;
@@ -306,10 +311,11 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
this.maxSize = maxSize;
this.upperBound = upperBound;
this.fifoQueue = new ArrayDeque<>(maxSize);
- this.priorityQueue = new PriorityQueue<>(maxSize, new
Comparator<Integer>() {
+ this.priorityQueue = new PriorityQueue<>(maxSize, new
Comparator<YarnContainerRequestBundle>() {
@Override
- public int compare(Integer o1, Integer o2) {
- return o2.compareTo(o1);
+ public int compare(YarnContainerRequestBundle o1,
YarnContainerRequestBundle o2) {
+ Integer i2 = o2.getTotalContainers();
+ return i2.compareTo(o1.getTotalContainers());
}
});
}
@@ -323,14 +329,14 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
* When a new element is larger than upperbound, reject the value since we
may request too many Yarn containers.
* When queue is full, evict head of FIFO-queue (In FIFO queue, elements
are inserted from tail).
*/
- public void add(int e) {
- if (e > upperBound) {
+ public void add(YarnContainerRequestBundle e) {
+ if (e.getTotalContainers() > upperBound) {
log.error(String.format("Request of getting %s containers seems to be
excessive, rejected", e));
return;
}
if (fifoQueue.size() == maxSize) {
- Integer removedElement = fifoQueue.remove();
+ YarnContainerRequestBundle removedElement = fifoQueue.remove();
priorityQueue.remove(removedElement);
}
@@ -345,7 +351,7 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
/**
* If queue is empty, throw {@link IllegalStateException}.
*/
- public int getMax() {
+ public YarnContainerRequestBundle getMax() {
if (priorityQueue.size() > 0) {
return this.priorityQueue.peek();
} else {
@@ -353,4 +359,4 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
}
}
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
new file mode 100644
index 000000000..4353d588e
--- /dev/null
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The class that represents current Yarn container request that will be used
by {link @YarnService}.
+ * Yarn container allocation should associate with helix tag, as workflows can
have specific helix tag setup
+ * and specific resource requirement.
+ */
+@Slf4j
+@Getter
+public class YarnContainerRequestBundle {
+ int totalContainers;
+ private final Map<String, Integer> helixTagContainerCountMap;
+ private final Map<String, Resource> helixTagResourceMap;
+ private final Map<String, Set<String>> resourceHelixTagMap;
+
+ public YarnContainerRequestBundle() {
+ this.totalContainers = 0;
+ this.helixTagContainerCountMap = new HashMap<>();
+ this.helixTagResourceMap = new HashMap<>();
+ this.resourceHelixTagMap = new HashMap<>();
+ }
+
+ public void add(String helixTag, int containerCount, Resource resource) {
+ helixTagContainerCountMap.put(helixTag,
helixTagContainerCountMap.getOrDefault(helixTag, 0) + containerCount);
+ if(helixTagResourceMap.containsKey(helixTag)) {
+ Resource existedResource = helixTagResourceMap.get(helixTag);
+ Preconditions.checkArgument(resource.getMemory() ==
existedResource.getMemory() &&
+ resource.getVirtualCores() == existedResource.getVirtualCores(),
+ "Helix tag need to have consistent resource requirement. Tag " +
helixTag
+ + " has existed resource require " + existedResource.toString()
+ " and different require " + resource.toString());
+ } else {
+ helixTagResourceMap.put(helixTag, resource);
+ Set<String> tagSet =
resourceHelixTagMap.getOrDefault(resource.toString(), new HashSet<>());
+ tagSet.add(helixTag);
+ resourceHelixTagMap.put(resource.toString(), tagSet);
+ }
+ totalContainers += containerCount;
+ }
+
+ // This method assumes the resource requirement for the helix tag is already
stored in the map
+ public void add(String helixTag, int containerCount) {
+ if (!helixTagContainerCountMap.containsKey(helixTag) &&
!helixTagResourceMap.containsKey(helixTag)) {
+ log.error("Helix tag {} is not present in the request bundle yet, can't
process the request to add {} "
+ + "container for it without specifying the resource requirement",
helixTag, containerCount);
+ return;
+ }
+ helixTagContainerCountMap.put(helixTag,
helixTagContainerCountMap.get(helixTag) + containerCount);
+ this.totalContainers += containerCount;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 5309a13ac..1ecbd4510 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -229,4 +230,28 @@ public class YarnHelixUtils {
public static String getContainerNum(String containerId) {
return "container-" + containerId.substring(containerId.lastIndexOf("_") +
1);
}
-}
+
+ /**
+ * Find the helix tag for the newly allocated container. The tag should
align with {@link YarnContainerRequestBundle},
+ * so that the correct resource can be allocated to helix workflow that has
specific resource requirement.
+ * @param container newly allocated container
+ * @param helixTagAllocatedContainerCount current container count for each
helix tag
+ * @param requestedYarnContainer yarn container request specify the desired
state
+ * @return helix tag that this container should be assigned with, if null
means need to use the default
+ */
+ public static String findHelixTagForContainer(Container container,
+ Map<String, Integer> helixTagAllocatedContainerCount,
YarnContainerRequestBundle requestedYarnContainer) {
+ String foundTag = null;
+ if(requestedYarnContainer != null &&
requestedYarnContainer.getResourceHelixTagMap().containsKey(container.getResource().toString()))
{
+ for (String tag :
requestedYarnContainer.getResourceHelixTagMap().get(container.getResource().toString()))
{
+ int desiredCount =
requestedYarnContainer.getHelixTagContainerCountMap().get(tag);
+ int allocatedCount = helixTagAllocatedContainerCount.getOrDefault(tag,
0);
+ foundTag = tag;
+ if(allocatedCount < desiredCount) {
+ return foundTag;
+ }
+ }
+ }
+ return foundTag;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 9fadc9485..d265d0296 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -35,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.AllArgsConstructor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -70,7 +70,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -127,6 +126,7 @@ public class YarnService extends AbstractIdleService {
private final String applicationName;
private final String applicationId;
private final String appViewAcl;
+ //Default helix instance tag derived from cluster level config
private final String helixInstanceTags;
private final Config config;
@@ -158,6 +158,7 @@ public class YarnService extends AbstractIdleService {
private final String containerTimezone;
private final HelixManager helixManager;
+ @Getter(AccessLevel.PROTECTED)
private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
// Security tokens for accessing HDFS
@@ -167,10 +168,10 @@ public class YarnService extends AbstractIdleService {
private final Object allContainersStopped = new Object();
- // A map from container IDs to pairs of Container instances and Helix
participant IDs of the containers
+ // A map from container IDs to Container instances, Helix participant IDs of
the containers and Helix Tag
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
- private final ConcurrentMap<ContainerId, Map.Entry<Container, String>>
containerMap = Maps.newConcurrentMap();
+ private final ConcurrentMap<ContainerId, ContainerInfo> containerMap =
Maps.newConcurrentMap();
// A cache of the containers with an outstanding container release request.
// This is a cache instead of a set to get the automatic cleanup in case a
container completes before the requested
@@ -190,16 +191,16 @@ public class YarnService extends AbstractIdleService {
// instance names get picked up when replacement containers get allocated.
private final Set<String> unusedHelixInstanceNames =
ConcurrentHashMap.newKeySet();
- private volatile boolean shutdownInProgress = false;
+ // The map from helix tag to requested container count
+ private final Map<String, Integer> requestedContainerCountMap =
Maps.newConcurrentMap();
+ // The map from helix tag to allocated container count
+ private final Map<String, Integer> allocatedContainerCountMap =
Maps.newConcurrentMap();
- // The number of containers requested based on the desired target number of
containers. This is used to determine
- // how may additional containers to request since the the currently
allocated amount may be less than this amount if we
- // are waiting for containers to be allocated.
- // The currently allocated amount may also be higher than this amount if
YARN returned more than the requested number
- // of containers.
- @VisibleForTesting
- @Getter(AccessLevel.PROTECTED)
- private int numRequestedContainers = 0;
+ private volatile YarnContainerRequestBundle yarnContainerRequest;
+ private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
+ private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
+
+ private volatile boolean shutdownInProgress = false;
public YarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus, HelixManager helixManager) throws
Exception {
@@ -236,7 +237,8 @@ public class YarnService extends AbstractIdleService {
this.containerHostAffinityEnabled =
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
this.helixInstanceMaxRetries =
config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
- this.helixInstanceTags = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
+ this.helixInstanceTags = ConfigUtils.getString(config,
+ GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
this.containerJvmArgs =
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
:
@@ -286,14 +288,8 @@ public class YarnService extends AbstractIdleService {
this.requestedContainerCores));
return;
}
-
- requestContainer(newContainerRequest.getReplacedContainer().transform(new
Function<Container, String>() {
-
- @Override
- public String apply(Container container) {
- return container.getNodeId().getHost();
- }
- }));
+
requestContainer(newContainerRequest.getReplacedContainer().transform(container
-> container.getNodeId().getHost()),
+ newContainerRequest.getResource());
}
protected NMClientCallbackHandler getNMClientCallbackHandler() {
@@ -359,10 +355,10 @@ public class YarnService extends AbstractIdleService {
ExecutorsUtils.shutdownExecutorService(this.containerLaunchExecutor,
Optional.of(LOGGER));
// Stop the running containers
- for (Map.Entry<Container, String> entry : this.containerMap.values()) {
- LOGGER.info(String.format("Stopping container %s running participant
%s", entry.getKey().getId(),
- entry.getValue()));
- this.nmClientAsync.stopContainerAsync(entry.getKey().getId(),
entry.getKey().getNodeId());
+ for (ContainerInfo containerInfo : this.containerMap.values()) {
+ LOGGER.info("Stopping container {} running participant {}",
containerInfo.getContainer().getId(),
+ containerInfo.getHelixParticipantId());
+
this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(),
containerInfo.getContainer().getNodeId());
}
if (!this.containerMap.isEmpty()) {
@@ -431,27 +427,35 @@ public class YarnService extends AbstractIdleService {
* number of containers. The intended usage is for the caller of this method
to make periodic calls to attempt to
* adjust the cluster towards the desired number of containers.
*
- * @param numTargetContainers the desired number of containers
+ * @param yarnContainerRequestBundle the desired containers information,
including numbers, resource and helix tag
* @param inUseInstances a set of in use instances
*/
- public synchronized void requestTargetNumberOfContainers(int
numTargetContainers, Set<String> inUseInstances) {
- LOGGER.debug("Requesting numTargetContainers {} current
numRequestedContainers {} in use instances {} map size {}",
- numTargetContainers, this.numRequestedContainers, inUseInstances,
this.containerMap.size());
-
+ public synchronized void
requestTargetNumberOfContainers(YarnContainerRequestBundle
yarnContainerRequestBundle, Set<String> inUseInstances) {
+ LOGGER.debug("Requesting numTargetContainers {}, in use instances count is
{}, container map size is {}",
+ yarnContainerRequestBundle.getTotalContainers(), inUseInstances,
this.containerMap.size());
+ int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
// YARN can allocate more than the requested number of containers, compute
additional allocations and deallocations
// based on the max of the requested and actual allocated counts
int numAllocatedContainers = this.containerMap.size();
- // The number of allocated containers may be higher than the previously
requested amount
- // and there may be outstanding allocation requests, so the max of both
counts is computed here
- // and used to decide whether to allocate containers.
- int numContainers = Math.max(numRequestedContainers,
numAllocatedContainers);
-
// Request additional containers if the desired count is higher than the
max of the current allocation or previously
// requested amount. Note that there may be in-flight or additional
allocations after numContainers has been computed
// so overshooting can occur, but periodic calls to this method will make
adjustments towards the target.
- for (int i = numContainers; i < numTargetContainers; i++) {
- requestContainer(Optional.<String>absent());
+ for (Map.Entry<String, Integer> entry :
yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
+ String currentHelixTag = entry.getKey();
+ int desiredContainerCount = entry.getValue();
+ int requestedContainerCount =
requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
+ for(; requestedContainerCount < desiredContainerCount;
requestedContainerCount++) {
+ requestContainer(Optional.absent(),
yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
+ }
+ requestedContainerCountMap.put(currentHelixTag, desiredContainerCount);
+ }
+
+ // If a requested tag is not presented in the new request, update the
requested count to 0 as we should release them
+ for(String requestedHelixTag : requestedContainerCountMap.keySet()) {
+
if(!yarnContainerRequestBundle.getHelixTagContainerCountMap().containsKey(requestedHelixTag))
{
+ requestedContainerCountMap.put(requestedHelixTag, 0);
+ }
}
// If the total desired is lower than the currently allocated amount then
release free containers.
@@ -462,12 +466,13 @@ public class YarnService extends AbstractIdleService {
LOGGER.debug("Shrinking number of containers by {}",
(numAllocatedContainers - numTargetContainers));
List<Container> containersToRelease = new ArrayList<>();
- int numToShutdown = numContainers - numTargetContainers;
+ int numToShutdown = numAllocatedContainers - numTargetContainers;
// Look for eligible containers to release. If a container is in use
then it is not released.
- for (Map.Entry<ContainerId, Map.Entry<Container, String>> entry :
this.containerMap.entrySet()) {
- if (!inUseInstances.contains(entry.getValue().getValue())) {
- containersToRelease.add(entry.getValue().getKey());
+ for (Map.Entry<ContainerId, ContainerInfo> entry :
this.containerMap.entrySet()) {
+ ContainerInfo containerInfo = entry.getValue();
+ if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
+ containersToRelease.add(containerInfo.getContainer());
}
if (containersToRelease.size() == numToShutdown) {
@@ -479,32 +484,47 @@ public class YarnService extends AbstractIdleService {
this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
}
-
- this.numRequestedContainers = numTargetContainers;
+ this.yarnContainerRequest = yarnContainerRequestBundle;
+ LOGGER.info("Current tag-container being requested:{}, tag-container
allocated: {}",
+ this.requestedContainerCountMap, this.allocatedContainerCountMap);
}
+ // Request initial containers with default resource and helix tag
private void requestInitialContainers(int containersRequested) {
- requestTargetNumberOfContainers(containersRequested,
Collections.EMPTY_SET);
+ YarnContainerRequestBundle initialYarnContainerRequest = new
YarnContainerRequestBundle();
+ Resource capability =
Resource.newInstance(this.requestedContainerMemoryMbs,
this.requestedContainerCores);
+ initialYarnContainerRequest.add(this.helixInstanceTags,
containersRequested, capability);
+ requestTargetNumberOfContainers(initialYarnContainerRequest,
Collections.EMPTY_SET);
}
- private void requestContainer(Optional<String> preferredNode) {
- Priority priority = Records.newRecord(Priority.class);
- priority.setPriority(0);
+ private void requestContainer(Optional<String> preferredNode,
Optional<Resource> resourceOptional) {
+ Resource desiredResource = resourceOptional.or(Resource.newInstance(
+ this.requestedContainerMemoryMbs, this.requestedContainerCores));
+ requestContainer(preferredNode, desiredResource);
+ }
- Resource capability = Records.newRecord(Resource.class);
- int maxMemoryCapacity = this.maxResourceCapacity.get().getMemory();
- capability.setMemory(this.requestedContainerMemoryMbs <= maxMemoryCapacity
?
- this.requestedContainerMemoryMbs : maxMemoryCapacity);
- int maxCoreCapacity = this.maxResourceCapacity.get().getVirtualCores();
- capability.setVirtualCores(this.requestedContainerCores <= maxCoreCapacity
?
- this.requestedContainerCores : maxCoreCapacity);
+ // Request containers with specific resource requirement
+ private void requestContainer(Optional<String> preferredNode, Resource
resource) {
+ // Fail if Yarn cannot meet container resource requirements
+ Preconditions.checkArgument(resource.getMemory() <=
this.maxResourceCapacity.get().getMemory() &&
+ resource.getVirtualCores() <=
this.maxResourceCapacity.get().getVirtualCores(),
+ "Resource requirement must less than the max resource capacity.
Requested resource" + resource.toString()
+ + " exceed the max resource limit " +
this.maxResourceCapacity.get().toString());
+
+ // Due to YARN-314, different resource capacity needs different priority,
otherwise Yarn will not allocate container
+ Priority priority = Records.newRecord(Priority.class);
+ if(!resourcePriorityMap.containsKey(resource.toString())) {
+ resourcePriorityMap.put(resource.toString(),
priorityNumGenerator.getAndIncrement());
+ }
+ int priorityNum = resourcePriorityMap.get(resource.toString());
+ priority.setPriority(priorityNum);
String[] preferredNodes = preferredNode.isPresent() ? new String[]
{preferredNode.get()} : null;
this.amrmClientAsync.addContainerRequest(
- new AMRMClient.ContainerRequest(capability, preferredNodes, null,
priority));
+ new AMRMClient.ContainerRequest(resource, preferredNodes, null,
priority));
}
- protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
+ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
throws IOException {
Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs,
this.applicationName, this.applicationId);
Path containerWorkDir = new Path(appWorkDir,
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
@@ -527,7 +547,7 @@ public class YarnService extends AbstractIdleService {
ContainerLaunchContext containerLaunchContext =
Records.newRecord(ContainerLaunchContext.class);
containerLaunchContext.setLocalResources(resourceMap);
containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(container,
helixInstanceName)));
+
containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
Map<ApplicationAccessType, String> acls = new HashMap<>(1);
acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -580,11 +600,11 @@ public class YarnService extends AbstractIdleService {
}
@VisibleForTesting
- protected String buildContainerCommand(Container container, String
helixInstanceName) {
+ protected String buildContainerCommand(ContainerInfo containerInfo) {
String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
StringBuilder containerCommand = new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
- .append(" -Xmx").append((int) (container.getResource().getMemory() *
this.jvmMemoryXmxRatio) -
+ .append(" -Xmx").append((int)
(containerInfo.getContainer().getResource().getMemory() *
this.jvmMemoryXmxRatio) -
this.jvmMemoryOverheadMbs).append("M")
.append("
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
@@ -596,16 +616,16 @@ public class YarnService extends AbstractIdleService {
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
.append(" ").append(this.applicationId)
.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
- .append(" ").append(helixInstanceName);
+ .append(" ").append(containerInfo.getHelixParticipantId());
- if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
+ if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
containerCommand.append("
--").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
- .append(" ").append(helixInstanceTags);
+ .append(" ").append(containerInfo.getHelixTag());
}
return containerCommand.append("
1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
- containerProcessName).append(".").append(ApplicationConstants.STDOUT)
+ containerProcessName).append(".").append(ApplicationConstants.STDOUT)
.append("
2>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
-
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
+
containerProcessName).append(".").append(ApplicationConstants.STDERR).toString();
}
/**
@@ -640,11 +660,13 @@ public class YarnService extends AbstractIdleService {
* A replacement container is needed in all but the last case.
*/
protected void handleContainerCompletion(ContainerStatus containerStatus) {
- Map.Entry<Container, String> completedContainerEntry =
this.containerMap.remove(containerStatus.getContainerId());
+ ContainerInfo completedContainerInfo =
this.containerMap.remove(containerStatus.getContainerId());
//Get the Helix instance name for the completed container. Because
callbacks are processed asynchronously, we might
//encounter situations where handleContainerCompletion() is called before
onContainersAllocated(), resulting in the
//containerId missing from the containersMap.
- String completedInstanceName = completedContainerEntry == null?
UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
+ String completedInstanceName = completedContainerInfo == null?
UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
+ String helixTag = completedContainerInfo == null ? helixInstanceTags :
completedContainerInfo.getHelixTag();
+ allocatedContainerCountMap.put(helixTag,
allocatedContainerCountMap.get(helixTag) - 1);
LOGGER.info(String.format("Container %s running Helix instance %s has
completed with exit status %d",
containerStatus.getContainerId(), completedInstanceName,
containerStatus.getExitStatus()));
@@ -657,15 +679,15 @@ public class YarnService extends AbstractIdleService {
if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
if
(this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) !=
null) {
LOGGER.info("Container release requested, so not spawning a
replacement for containerId {}", containerStatus.getContainerId());
- if (completedContainerEntry != null) {
+ if (completedContainerInfo != null) {
LOGGER.info("Adding instance {} to the pool of unused instances",
completedInstanceName);
this.unusedHelixInstanceNames.add(completedInstanceName);
}
return;
} else {
LOGGER.info("Container {} aborted due to lost NM",
containerStatus.getContainerId());
- // Container release was not requested. Likely, the container was
running on a node on which the NM died.
- // In this case, RM assumes that the containers are "lost", even though
the container process may still be
+ // Container release was not requested. Likely, the container was
running on a node on which the NM died.
+ // In this case, RM assumes that the containers are "lost", even
though the container process may still be
// running on the node. We need to ensure that the Helix instances
running on the orphaned containers
// are fenced off from the Helix cluster to avoid double publishing
and state being committed by the
// instances.
@@ -683,7 +705,7 @@ public class YarnService extends AbstractIdleService {
if (this.shutdownInProgress) {
return;
}
- if(completedContainerEntry != null) {
+ if(completedContainerInfo != null) {
this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new
AtomicInteger(0));
int retryCount =
this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
@@ -715,10 +737,13 @@ public class YarnService extends AbstractIdleService {
.submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION,
eventMetadataBuilder.get().build());
}
}
- LOGGER.info(String.format("Requesting a new container to replace %s to run
Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
+ Optional<Resource> newContainerResource = completedContainerInfo != null ?
+ Optional.of(completedContainerInfo.getContainer().getResource()) :
Optional.absent();
+ LOGGER.info("Requesting a new container to replace {} to run Helix
instance {} with helix tag {} and resource {}",
+ containerStatus.getContainerId(), completedInstanceName, helixTag,
newContainerResource.orNull());
this.eventBus.post(new NewContainerRequest(
- shouldStickToTheSameNode(containerStatus.getExitStatus()) &&
completedContainerEntry != null ?
- Optional.of(completedContainerEntry.getKey()) :
Optional.<Container>absent()));
+ shouldStickToTheSameNode(containerStatus.getExitStatus()) &&
completedContainerInfo != null ?
+ Optional.of(completedContainerInfo.getContainer()) :
Optional.absent(), newContainerResource));
}
private ImmutableMap.Builder<String, String>
buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
@@ -755,12 +780,18 @@ public class YarnService extends AbstractIdleService {
@Override
public void onContainersAllocated(List<Container> containers) {
for (final Container container : containers) {
+ String containerId = container.getId().toString();
+ String containerHelixTag =
YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap,
yarnContainerRequest);
+ if (Strings.isNullOrEmpty(containerHelixTag)) {
+ containerHelixTag = helixInstanceTags;
+ }
if (eventSubmitter.isPresent()) {
eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
- GobblinYarnMetricTagNames.CONTAINER_ID,
container.getId().toString());
+ GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
}
- LOGGER.info(String.format("Container %s has been allocated",
container.getId()));
+ LOGGER.info("Container {} has been allocated with resource {} for
helix tag {}",
+ container.getId(), container.getResource(), containerHelixTag);
//Iterate over the (thread-safe) set of unused instances to find the
first instance that is not currently live.
//Once we find a candidate instance, it is removed from the set.
@@ -781,6 +812,8 @@ public class YarnService extends AbstractIdleService {
instanceName = null;
}
}
+ allocatedContainerCountMap.put(containerHelixTag,
+ allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) +
1);
}
if (Strings.isNullOrEmpty(instanceName)) {
@@ -789,8 +822,8 @@ public class YarnService extends AbstractIdleService {
.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX,
helixInstanceIdGenerator.incrementAndGet());
}
- final String finalInstanceName = instanceName;
- containerMap.put(container.getId(), new
AbstractMap.SimpleImmutableEntry<>(container, finalInstanceName));
+ ContainerInfo containerInfo = new ContainerInfo(container,
instanceName, containerHelixTag);
+ containerMap.put(container.getId(), containerInfo);
// Find matching requests and remove the request to reduce the chance
that a subsequent request
// will request extra containers. YARN does not have a delta request
API and the requests are not
@@ -819,11 +852,11 @@ public class YarnService extends AbstractIdleService {
@Override
public void run() {
try {
- LOGGER.info("Starting container " + container.getId());
+ LOGGER.info("Starting container " + containerId);
- nmClientAsync.startContainerAsync(container,
newContainerLaunchContext(container, finalInstanceName));
+ nmClientAsync.startContainerAsync(container,
newContainerLaunchContext(containerInfo));
} catch (IOException ioe) {
- LOGGER.error("Failed to start container " + container.getId(),
ioe);
+ LOGGER.error("Failed to start container " + containerId, ioe);
}
}
});
@@ -869,7 +902,7 @@ public class YarnService extends AbstractIdleService {
/**
* A custom implementation of {@link NMClientAsync.CallbackHandler}.
*/
- class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
+ class NMClientCallbackHandler implements NMClientAsync.CallbackHandler {
@Override
public void onContainerStarted(ContainerId containerId, Map<String,
ByteBuffer> allServiceResponse) {
@@ -939,4 +972,13 @@ public class YarnService extends AbstractIdleService {
LOGGER.error(String.format("Failed to stop container %s due to error
%s", containerId, t));
}
}
-}
+
+ //A class encapsulates Container instances, Helix participant IDs of the
containers and Helix Tag
+ @AllArgsConstructor
+ @Getter
+ static class ContainerInfo {
+ private final Container container;
+ private final String helixParticipantId;
+ private final String helixTag;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
index 7ee4b3223..6e272b7c0 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
@@ -17,9 +17,11 @@
package org.apache.gobblin.yarn.event;
+import lombok.Getter;
import org.apache.hadoop.yarn.api.records.Container;
import com.google.common.base.Optional;
+import org.apache.hadoop.yarn.api.records.Resource;
/**
@@ -30,9 +32,17 @@ import com.google.common.base.Optional;
public class NewContainerRequest {
private final Optional<Container> replacedContainer;
+ @Getter
+ private final Optional<Resource> resource;
public NewContainerRequest(Optional<Container> replacedContainer) {
this.replacedContainer = replacedContainer;
+ this.resource = Optional.absent();
+ }
+
+ public NewContainerRequest(Optional<Container> replacedContainer,
Optional<Resource> resource) {
+ this.replacedContainer = replacedContainer;
+ this.resource = resource;
}
/**
@@ -43,4 +53,4 @@ public class NewContainerRequest {
public Optional<Container> getReplacedContainer() {
return this.replacedContainer;
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
index 9bb5b747e..8dceb4420 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -75,4 +76,10 @@ public class GobblinYarnTestUtils {
credentials.addToken(token.getService(), token);
credentials.writeTokenStorageFile(path, new Configuration());
}
-}
+
+ public static YarnContainerRequestBundle createYarnContainerRequest(int n,
Resource resource) {
+ YarnContainerRequestBundle yarnContainerRequestBundle = new
YarnContainerRequestBundle();
+ yarnContainerRequestBundle.add("GobblinKafkaStreaming", n, resource);
+ return yarnContainerRequestBundle;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 6c4047147..259378378 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -19,15 +19,22 @@ package org.apache.gobblin.yarn;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
+import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -35,6 +42,7 @@ import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import static org.mockito.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -47,6 +55,9 @@ public class YarnAutoScalingManagerTest {
// A queue within size == 1 and upperBound == "infinite" should not impact
on the execution.
private final static YarnAutoScalingManager.SlidingWindowReservoir noopQueue
=
new YarnAutoScalingManager.SlidingWindowReservoir(1, Integer.MAX_VALUE);
+ private final static int defaultContainerMemory = 1024;
+ private final static int defaultContainerCores = 2;
+ private final static String defaultHelixTag = "DefaultHelixTag";
/**
* Test for one workflow with one job
*/
@@ -82,13 +93,15 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
- 1, 10, 1.0, noopQueue, helixDataAccessor);
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable.run();
-
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
// 2 containers requested and one worker in use
Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(2,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
}
/**
@@ -131,14 +144,17 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-2", new HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable.run();
// 3 containers requested and 2 workers in use
- Mockito.verify(mockYarnService, times(1))
- .requestTargetNumberOfContainers(3,
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
}
/**
@@ -200,14 +216,17 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-3", new HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable.run();
// 5 containers requested and 3 workers in use
- Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(5,
- ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2",
"GobblinYarnTaskRunner-3"));
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
}
/**
@@ -269,14 +288,17 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-2", new HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable.run();
// 3 containers requested and 2 workers in use
- Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(3,
- ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
}
/**
@@ -313,103 +335,17 @@ public class YarnAutoScalingManagerTest {
.thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 2, 1, 10, 1.0, noopQueue, helixDataAccessor);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 2,
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable.run();
// 1 container requested since 2 partitions and limit is 2 partitions per
container. One worker in use.
- Mockito.verify(mockYarnService, times(1))
- .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
- }
-
-
- /**
- * Test min containers
- */
- @Test
- public void testMinContainers() throws IOException {
- YarnService mockYarnService = mock(YarnService.class);
- TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
-
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
- JobContext mockJobContext = mock(JobContext.class);
- Mockito.when(mockJobContext.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
-
- YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 5, 10, 1.0, noopQueue, helixDataAccessor);
-
- runnable.run();
-
- // 5 containers requested due to min and one worker in use
- Mockito.verify(mockYarnService, times(1))
- .requestTargetNumberOfContainers(5,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
- }
-
- /**
- * Test max containers
- */
- @Test
- public void testMaxContainers() throws IOException {
- YarnService mockYarnService = mock(YarnService.class);
- TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
-
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
- JobContext mockJobContext = mock(JobContext.class);
- Mockito.when(mockJobContext.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
-
- YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 1, 1, 1.0, noopQueue, helixDataAccessor);
-
- runnable.run();
-
- // 1 container requested to max and one worker in use
- Mockito.verify(mockYarnService, times(1))
- .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
}
@Test
@@ -443,41 +379,49 @@ public class YarnAutoScalingManagerTest {
.thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 1, 10, 1.2, noopQueue, helixDataAccessor);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
+ 1.2, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable1.run();
// 3 containers requested to max and one worker in use
- // NumPartitions = 2, Partitions per container = 1 and overprovision =
1.2, Min containers = 1, Max = 10
- // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3.
- Mockito.verify(mockYarnService, times(1))
- .requestTargetNumberOfContainers(3,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
-
+ // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
+ // so targetNumContainers = Ceil((2/1) * 1.2)) = 3.
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+ Mockito.reset(mockYarnService);
YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 1, 10, 0.1, noopQueue, helixDataAccessor);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
+ 0.1, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable2.run();
// 3 containers requested to max and one worker in use
- // NumPartitions = 2, Partitions per container = 1 and overprovision =
1.2, Min containers = 1, Max = 10
- // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1.
- Mockito.verify(mockYarnService, times(1))
- .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
+ // so targetNumContainers = Ceil((2/1) * 0.1)) = 1.
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+ Mockito.reset(mockYarnService);
YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService,
- 1, 1, 10, 6.0, noopQueue, helixDataAccessor);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
+ 6.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
runnable3.run();
// 3 containers requested to max and one worker in use
// NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
- // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10.
- Mockito.verify(mockYarnService, times(1))
- .requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ // so targetNumContainers = Ceil((2/1) * 6.0)) = 12.
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 12);
}
/**
@@ -514,37 +458,43 @@ public class YarnAutoScalingManagerTest {
.thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
TestYarnAutoScalingRunnable runnable =
- new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1,
1, helixDataAccessor);
+ new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
helixDataAccessor);
runnable.setRaiseException(true);
runnable.run();
- Mockito.verify(mockYarnService,
times(0)).requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(0)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
Mockito.reset(mockYarnService);
runnable.setRaiseException(false);
runnable.run();
- // 1 container requested to max and one worker in use
- Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ // 2 container requested
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
}
public void testMaxValueEvictingQueue() throws Exception {
+ Resource resource = Resource.newInstance(16, 1);
YarnAutoScalingManager.SlidingWindowReservoir window = new
YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
-
// Normal insertion with eviction of originally largest value
- window.add(3);
- window.add(1);
- window.add(2);
+ window.add(GobblinYarnTestUtils.createYarnContainerRequest(3, resource));
+ window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
+ window.add(GobblinYarnTestUtils.createYarnContainerRequest(2, resource));
// Now it contains [3,1,2]
- Assert.assertEquals(window.getMax(), 3);
- window.add(1);
+ Assert.assertEquals(window.getMax().getTotalContainers(), 3);
+ window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
// Now it contains [1,2,1]
- Assert.assertEquals(window.getMax(), 2);
- window.add(5);
- Assert.assertEquals(window.getMax(), 5);
+ Assert.assertEquals(window.getMax().getTotalContainers(), 2);
+ window.add(GobblinYarnTestUtils.createYarnContainerRequest(5, resource));
+ Assert.assertEquals(window.getMax().getTotalContainers(), 5);
// Now it contains [2,1,5]
- window.add(11);
+ window.add(GobblinYarnTestUtils.createYarnContainerRequest(11, resource));
// Still [2,1,5] as 11 > 10 thereby being rejected.
- Assert.assertEquals(window.getMax(), 5);
+ Assert.assertEquals(window.getMax().getTotalContainers(), 5);
}
/**
@@ -557,7 +507,6 @@ public class YarnAutoScalingManagerTest {
TaskDriver mockTaskDriver = mock(TaskDriver.class);
WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
JobDag mockJobDag = mock(JobDag.class);
-
Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
@@ -585,14 +534,17 @@ public class YarnAutoScalingManagerTest {
"GobblinYarnTaskRunner-2", new HelixProperty("")));
TestYarnAutoScalingRunnable runnable = new
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 1, 1, 10, helixDataAccessor);
+ 1, helixDataAccessor);
runnable.run();
// 2 containers requested and one worker in use, while the evaluation will
hold for true if not set externally,
// still tell YarnService there are two instances being used.
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(2,
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
// Set failEvaluation which simulates the "beyond tolerance" case.
Mockito.reset(mockYarnService);
@@ -600,7 +552,98 @@ public class YarnAutoScalingManagerTest {
runnable.run();
Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(2,
ImmutableSet.of("GobblinYarnTaskRunner-2"));
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-2")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+ }
+
+ @Test
+ public void testFlowsWithHelixTags() {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+ WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+ JobDag mockJobDag1 = mock(JobDag.class);
+
+ Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
+ Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+
+ WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
+
+ JobContext mockJobContext1 = mock(JobContext.class);
+ Mockito.when(mockJobContext1.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+ JobContext mockJobContext2 = mock(JobContext.class);
+ Mockito.when(mockJobContext2.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
+
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+ WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
+ JobDag mockJobDag2 = mock(JobDag.class);
+
+
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
+ Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
+
+ WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+
+ JobContext mockJobContext3 = mock(JobContext.class);
+ Mockito.when(mockJobContext3.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
+
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
+
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+ JobConfig mockJobConfig3 = mock(JobConfig.class);
+ String helixTag = "test-Tag1";
+ Map<String, String> resourceMap = new HashMap<>();
+
resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
"512");
+ resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
"8");
+ Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag);
+
Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap);
+
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
+
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty(""),
+ "GobblinYarnTaskRunner-2", new HelixProperty(""),
+ "GobblinYarnTaskRunner-3", new HelixProperty("")));
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
+ 1.0, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
+
+ runnable.run();
+
+ // 5 containers requested and 3 workers in use
+ ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
+ Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+ Map<String, Set<String>> resourceHelixTagMap =
argument.getValue().getResourceHelixTagMap();
+ Map<String, Resource> helixTagResourceMap =
argument.getValue().getHelixTagResourceMap();
+ Map<String, Integer> helixTagContainerCountMap =
argument.getValue().getHelixTagContainerCountMap();
+
+ // Verify that 3 containers requested with default tag and resource
setting,
+ // while 2 with specific helix tag and resource requirement
+ Assert.assertEquals(resourceHelixTagMap.size(), 2);
+ Assert.assertEquals(helixTagResourceMap.get(helixTag),
Resource.newInstance(512, 8));
+ Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag),
Resource.newInstance(defaultContainerMemory, defaultContainerCores));
+ Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2);
+ Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag),
3);
+
}
private static class TestYarnAutoScalingRunnable extends
YarnAutoScalingManager.YarnAutoScalingRunnable {
@@ -608,8 +651,9 @@ public class YarnAutoScalingManagerTest {
boolean alwaysUnused = false;
public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService
yarnService, int partitionsPerContainer,
- int minContainers, int maxContainers, HelixDataAccessor
helixDataAccessor) {
- super(taskDriver, yarnService, partitionsPerContainer, minContainers,
maxContainers, 1.0, noopQueue, helixDataAccessor);
+ HelixDataAccessor helixDataAccessor) {
+ super(taskDriver, yarnService, partitionsPerContainer, 1.0,
+ noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
}
@Override
@@ -634,4 +678,4 @@ public class YarnAutoScalingManagerTest {
return alwaysUnused || super.isInstanceUnused(participant);
}
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 215e1bd03..76c53330b 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
@@ -147,7 +148,7 @@ public class YarnServiceTest {
this.clusterConf,
FileSystem.getLocal(new Configuration()), this.eventBus);
- this.yarnService.startUp();
+ this.yarnService.startUp();
}
private void startApp() throws Exception {
@@ -207,42 +208,43 @@ public class YarnServiceTest {
*/
@Test(groups = {"gobblin.yarn", "disabledOnCI"})
public void testScaleUp() {
- this.yarnService.requestTargetNumberOfContainers(10,
Collections.EMPTY_SET);
+ Resource resource = Resource.newInstance(64, 1);
+ this.yarnService.requestTargetNumberOfContainers(
+ GobblinYarnTestUtils.createYarnContainerRequest(10, resource),
Collections.EMPTY_SET);
- Assert.assertFalse(this.yarnService.getMatchingRequestsList(64,
1).isEmpty());
- Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
+
Assert.assertFalse(this.yarnService.getMatchingRequestsList(resource).isEmpty());
Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
-
+ Assert.assertEquals(this.yarnService.getContainerMap().size(), 10);
// container request list that had entries earlier should now be empty
- Assert.assertEquals(this.yarnService.getMatchingRequestsList(64,
1).size(), 0);
+
Assert.assertEquals(this.yarnService.getMatchingRequestsList(resource).size(),
0);
}
@Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods =
"testScaleUp")
public void testScaleDownWithInUseInstances() {
Set<String> inUseInstances = new HashSet<>();
-
for (int i = 1; i <= 8; i++) {
inUseInstances.add("GobblinYarnTaskRunner_" + i);
}
-
- this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
-
- Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
+ Resource resource = Resource.newInstance(64, 1);
+ this.yarnService.requestTargetNumberOfContainers(
+ GobblinYarnTestUtils.createYarnContainerRequest(6, resource),
inUseInstances);
// will only be able to shrink to 8
Assert.assertTrue(this.yarnService.waitForContainerCount(8, 60000));
// will not be able to shrink to 6 due to 8 in-use instances
Assert.assertFalse(this.yarnService.waitForContainerCount(6, 10000));
-
+ Assert.assertEquals(this.yarnService.getContainerMap().size(), 8);
}
@Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods =
"testScaleDownWithInUseInstances")
public void testScaleDown() throws Exception {
- this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
+ Resource resource = Resource.newInstance(64, 1);
+ this.yarnService.requestTargetNumberOfContainers(
+ GobblinYarnTestUtils.createYarnContainerRequest(4, resource),
Collections.EMPTY_SET);
- Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
+ Assert.assertEquals(this.yarnService.getContainerMap().size(), 4);
}
// Keep this test last since it interferes with the container counts in the
prior tests.
@@ -279,14 +281,26 @@ public class YarnServiceTest {
0), 0);
Resource resource = Resource.newInstance(2048, 1);
Container container = Container.newInstance(containerId, null, null,
resource, null, null);
+ YarnService.ContainerInfo
+ containerInfo = new YarnService.ContainerInfo(container,
"helixInstance1", "helixTag");
- String command = yarnService.buildContainerCommand(container,
"helixInstance1");
+ String command = yarnService.buildContainerCommand(containerInfo);
// 1628 is from 2048 * 0.8 - 10
Assert.assertTrue(command.contains("-Xmx1628"));
}
- static class TestYarnService extends YarnService {
+ /**
+ * Test if requested resource exceed the resource limit, yarnService should
fail.
+ */
+ @Test(groups = {"gobblin.yarn", "disabledOnCI"}, expectedExceptions =
IllegalArgumentException.class)
+ public void testExceedResourceLimit() {
+ Resource resource = Resource.newInstance(204800, 10240);
+ this.yarnService.requestTargetNumberOfContainers(
+ GobblinYarnTestUtils.createYarnContainerRequest(10, resource),
Collections.EMPTY_SET);
+ }
+
+ static class TestYarnService extends YarnService {
public TestYarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus, getMockHelixManager(config));
@@ -310,19 +324,17 @@ public class YarnServiceTest {
return helixManager;
}
- protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
+ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
throws IOException {
return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(),
Collections.emptyMap(),
- Arrays.asList("sleep", "60000"), Collections.emptyMap(), null,
Collections.emptyMap());
+ Arrays.asList("sleep", "60000"), Collections.emptyMap(), null,
Collections.emptyMap());
}
/**
* Get the list of matching container requests for the specified resource
memory and cores.
*/
- public List<? extends Collection<AMRMClient.ContainerRequest>>
getMatchingRequestsList(int memory, int cores) {
- Resource resource = Resource.newInstance(memory, cores);
+ public List<? extends Collection<AMRMClient.ContainerRequest>>
getMatchingRequestsList(Resource resource) {
Priority priority = Priority.newInstance(0);
-
return getAmrmClientAsync().getMatchingRequests(priority,
ResourceRequest.ANY, resource);
}
@@ -346,14 +358,13 @@ public class YarnServiceTest {
Thread.currentThread().interrupt();
break;
}
-
+ ConcurrentMap<ContainerId, ContainerInfo> containerMap =
getContainerMap();
if (expectedCount == getContainerMap().size()) {
success = true;
break;
}
}
-
return success;
}
}
-}
+}
\ No newline at end of file
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index 1934ecebb..1041cc31b 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -33,7 +33,6 @@ import
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -198,10 +197,11 @@ public class YarnServiceTestWithExpiration {
@Test(groups = {"gobblin.yarn", "disabledOnCI"})
public void testStartError() throws Exception{
- this.expiredYarnService.requestTargetNumberOfContainers(10,
Collections.EMPTY_SET);
+ Resource resource = Resource.newInstance(16, 1);
+ this.expiredYarnService.requestTargetNumberOfContainers(
+ GobblinYarnTestUtils.createYarnContainerRequest(10, resource),
Collections.EMPTY_SET);
- Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64,
1).isEmpty());
- Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(),
10);
+
Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(resource).isEmpty());
AssertWithBackoff.create().logger(LOG).timeoutMs(60000).maxSleepMs(2000).backoffFactor(1.5)
.assertTrue(new Predicate<Void>() {
@@ -234,7 +234,7 @@ public class YarnServiceTestWithExpiration {
completedContainers.add(containerStatus);
}
- protected ContainerLaunchContext newContainerLaunchContext(Container
container, String helixInstanceName)
+ protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
throws IOException {
try {
Thread.sleep(1000);
@@ -251,4 +251,4 @@ public class YarnServiceTestWithExpiration {
}
}
}
-}
+}
\ No newline at end of file
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
index d2dc49c46..73ecf85cd 100644
--- a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -17,6 +17,7 @@
# Yarn/Helix configuration properties
gobblin.cluster.helix.cluster.name=YarnServiceTest
+gobblin.cluster.helixInstanceTags=GobblinKafkaStreaming
gobblin.yarn.app.name=YarnServiceTest
gobblin.yarn.work.dir=YarnServiceTest
@@ -30,7 +31,7 @@ gobblin.yarn.app.master.cores=1
gobblin.yarn.app.report.interval.minutes=1
gobblin.yarn.max.get.app.report.failures=4
gobblin.yarn.email.notification.on.shutdown=false
-gobblin.yarn.initial.containers=1
+gobblin.yarn.initial.containers=0
gobblin.yarn.container.memory.mbs=64
gobblin.yarn.container.cores=1
gobblin.yarn.container.affinity.enabled=true