This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7309060ac3 [GOBBLIN-2174] Define `DynamicScalingYarnService` for the
GoT workforce via `ScalingDirective`s (#4077)
7309060ac3 is described below
commit 7309060ac363a5303c372a1d363921d56633bab3
Author: Vivek Rai <[email protected]>
AuthorDate: Mon Dec 9 17:03:46 2024 +0530
[GOBBLIN-2174] Define `DynamicScalingYarnService` for the GoT workforce via
`ScalingDirective`s (#4077)
---
.../temporal/GobblinTemporalConfigurationKeys.java | 5 +
.../gobblin/temporal/dynamic/WorkerProfile.java | 11 ++
.../AbstractDynamicScalingYarnServiceManager.java | 120 ++++++++++++++
.../temporal/yarn/DynamicScalingYarnService.java | 99 +++++++++++
.../FsSourceDynamicScalingYarnServiceManager.java | 50 ++++++
.../yarn/GobblinTemporalApplicationMaster.java | 2 +-
.../apache/gobblin/temporal/yarn/YarnService.java | 182 ++++++++++++---------
.../dynamic/DummyScalingDirectiveSource.java | 76 +++++++++
.../DummyDynamicScalingYarnServiceManager.java | 38 +++++
.../yarn/DynamicScalingYarnServiceManagerTest.java | 118 +++++++++++++
.../yarn/DynamicScalingYarnServiceTest.java | 64 ++++++++
.../gobblin/temporal/yarn/YarnServiceTest.java | 126 ++++++++++++++
.../src/test/resources/YarnServiceTest.conf | 6 +
13 files changed, 816 insertions(+), 81 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 40223e093e..3d51f15c19 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -64,4 +64,9 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX +
"num.workers.per.container";
int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string";
+
+ /**
+ * Prefix for Gobblin-on-Temporal Dynamic Scaling
+ */
+ String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
index bf1f1d2e09..d67825cb22 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
@@ -18,12 +18,23 @@
package org.apache.gobblin.temporal.dynamic;
import com.typesafe.config.Config;
+import lombok.AllArgsConstructor;
import lombok.Data;
/** A named worker {@link Config} */
@Data
+@AllArgsConstructor
public class WorkerProfile {
private final String name;
private final Config config;
+
+ /**
+ * Constructs a `WorkerProfile` with the baseline name and the specified
configuration.
+ *
+ * @param config the configuration for the worker profile
+ */
+ public WorkerProfile(Config config) {
+ this(WorkforceProfiles.BASELINE_NAME, config);
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
new file mode 100644
index 0000000000..3022d5c9ec
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for
processing.
+ *
+ * This is an abstract class that provides the basic functionality for
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link
ScalingDirectiveSource} that will be used to get scaling directives.
+ *
+ * The actual implemented class needs to be passed as value of config {@link
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends
AbstractIdleService {
+
+ protected final static String DYNAMIC_SCALING_POLLING_INTERVAL =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+ private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+ protected final Config config;
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScheduledExecutorService dynamicScalingExecutor;
+
+ public
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ this.config = appMaster.getConfig();
+ if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+ this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
+ } else {
+ String errorMsg = "Failure while getting YarnService Instance from
GobblinTemporalApplicationMaster::get_yarnService()"
+ + " YarnService {" +
appMaster.get_yarnService().getClass().getName() + "} is not an instance of
DynamicScalingYarnService";
+ log.error(errorMsg);
+ throw new RuntimeException(errorMsg);
+ }
+ this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
+ Optional.of("DynamicScalingExecutor")));
+ }
+
+ @Override
+ protected void startUp() {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
+ DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ log.info("Starting the {} with re-scaling interval of {} seconds",
this.getClass().getSimpleName(), scheduleInterval);
+
+ this.dynamicScalingExecutor.scheduleAtFixedRate(
+ new GetScalingDirectivesRunnable(this.dynamicScalingYarnService,
createScalingDirectiveSource()),
+ scheduleInterval, scheduleInterval, TimeUnit.SECONDS
+ );
+ }
+
+ @Override
+ protected void shutDown() {
+ log.info("Stopping the " + this.getClass().getSimpleName());
+ ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor,
Optional.of(log));
+ }
+
+ /**
+ * Create a {@link ScalingDirectiveSource} to use for getting scaling
directives.
+ */
+ protected abstract ScalingDirectiveSource createScalingDirectiveSource();
+
+ /**
+ * A {@link Runnable} that gets the scaling directives from the {@link
ScalingDirectiveSource} and passes them to the
+ * {@link DynamicScalingYarnService} for processing.
+ */
+ @AllArgsConstructor
+ static class GetScalingDirectivesRunnable implements Runnable {
+ private final DynamicScalingYarnService dynamicScalingYarnService;
+ private final ScalingDirectiveSource scalingDirectiveSource;
+
+ @Override
+ public void run() {
+ try {
+ List<ScalingDirective> scalingDirectives =
scalingDirectiveSource.getScalingDirectives();
+ if (CollectionUtils.isNotEmpty(scalingDirectives)) {
+
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
+ }
+ } catch (IOException e) {
+ log.error("Failed to get scaling directives", e);
+ } catch (Throwable t) {
+ log.error("Unexpected error with dynamic scaling via directives", t);
+ }
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
new file mode 100644
index 0000000000..0720017b85
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+ /** this holds the current count of containers already requested for each
worker profile */
+ private final WorkforceStaffing actualWorkforceStaffing;
+ /** this holds the current total workforce plan as per latest received
scaling directives */
+ private final WorkforcePlan workforcePlan;
+
+ public DynamicScalingYarnService(Config config, String applicationName,
String applicationId,
+ YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus)
throws Exception {
+ super(config, applicationName, applicationId, yarnConfiguration, fs,
eventBus);
+
+ this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
+ this.workforcePlan = new WorkforcePlan(this.config,
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
+ }
+
+ @Override
+ protected synchronized void requestInitialContainers() {
+ StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+ requestNewContainersForStaffingDeltas(deltas);
+ }
+
+ /**
+ * Revises the workforce plan and requests new containers based on the given
scaling directives.
+ *
+ * @param scalingDirectives the list of scaling directives
+ */
+ public synchronized void
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective>
scalingDirectives) {
+ if (CollectionUtils.isEmpty(scalingDirectives)) {
+ return;
+ }
+ this.workforcePlan.reviseWhenNewer(scalingDirectives);
+ StaffingDeltas deltas =
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+ requestNewContainersForStaffingDeltas(deltas);
+ }
+
+ private synchronized void
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+ deltas.getPerProfileDeltas().forEach(profileDelta -> {
+ if (profileDelta.getDelta() > 0) { // scale up!
+ WorkerProfile workerProfile = profileDelta.getProfile();
+ String profileName = workerProfile.getName();
+ int currNumContainers =
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+ int delta = profileDelta.getDelta();
+ log.info("Requesting {} new containers for profile {} having currently
{} containers", delta,
+ WorkforceProfiles.renderName(profileName), currNumContainers);
+ requestContainersForWorkerProfile(workerProfile, delta);
+ // update our staffing after requesting new containers
+ this.actualWorkforceStaffing.reviseStaffing(profileName,
currNumContainers + delta, System.currentTimeMillis());
+ } else if (profileDelta.getDelta() < 0) { // scale down!
+ // TODO: Decide how to handle negative deltas
+ log.warn("Handling of Negative delta is not supported yet : Profile {}
delta {} ",
+ profileDelta.getProfile().getName(), profileDelta.getDelta());
+ } // else, already at staffing plan (or at least have requested, so
in-progress)
+ });
+ }
+
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
new file mode 100644
index 0000000000..f6b65bbd2d
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.Optional;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link FsScalingDirectiveSource} based implementation of {@link
AbstractDynamicScalingYarnServiceManager}.
+ */
+public class FsSourceDynamicScalingYarnServiceManager extends
AbstractDynamicScalingYarnServiceManager {
+ // TODO: replace fetching of these configs using a new method similar to
JobStateUtils::getWorkDirRoot
+ public final static String DYNAMIC_SCALING_DIRECTIVES_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
+ public final static String DYNAMIC_SCALING_ERRORS_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
+ private final FileSystem fs;
+
+ public
FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ super(appMaster);
+ this.fs = appMaster.getFs();
+ }
+
+ @Override
+ protected ScalingDirectiveSource createScalingDirectiveSource() {
+ return new FsScalingDirectiveSource(
+ this.fs,
+ this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+ Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+ );
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
index b7957bd9a2..3efadb11b3 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
@@ -114,7 +114,7 @@ public class GobblinTemporalApplicationMaster extends
GobblinTemporalClusterMana
protected YarnService buildTemporalYarnService(Config config, String
applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs)
throws Exception {
- return new YarnService(config, applicationName, applicationId,
yarnConfiguration, fs, this.eventBus);
+ return new DynamicScalingYarnService(config, applicationName,
applicationId, yarnConfiguration, fs, this.eventBus);
}
/**
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index c8fbd047c5..ec4da215a6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -34,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.commons.lang.StringUtils;
@@ -109,6 +109,7 @@ import org.apache.gobblin.yarn.YarnHelixUtils;
import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
import org.apache.gobblin.yarn.event.NewContainerRequest;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
/**
* This class is responsible for all Yarn-related stuffs including
ApplicationMaster registration,
@@ -130,8 +131,7 @@ class YarnService extends AbstractIdleService {
private final String appViewAcl;
//Default helix instance tag derived from cluster level config
private final String helixInstanceTags;
-
- private final Config config;
+ protected final Config config;
private final EventBus eventBus;
@@ -146,17 +146,11 @@ class YarnService extends AbstractIdleService {
private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
private final NMClientAsync nmClientAsync;
private final ExecutorService containerLaunchExecutor;
-
- private final int initialContainers;
private final int requestedContainerMemoryMbs;
private final int requestedContainerCores;
- private final int jvmMemoryOverheadMbs;
- private final double jvmMemoryXmxRatio;
private final boolean containerHostAffinityEnabled;
private final int helixInstanceMaxRetries;
-
- private final Optional<String> containerJvmArgs;
private final String containerTimezone;
private final String proxyJvmArgs;
@@ -200,6 +194,9 @@ class YarnService extends AbstractIdleService {
private volatile boolean shutdownInProgress = false;
private final boolean jarCacheEnabled;
+ private static final long DEFAULT_ALLOCATION_REQUEST_ID = 0L;
+ private final AtomicLong allocationRequestIdGenerator = new
AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID);
+ private final ConcurrentMap<Long, WorkerProfile>
workerProfileByAllocationRequestId = new ConcurrentHashMap<>();
public YarnService(Config config, String applicationName, String
applicationId, YarnConfiguration yarnConfiguration,
FileSystem fs, EventBus eventBus) throws Exception {
@@ -229,7 +226,6 @@ class YarnService extends AbstractIdleService {
this.nmClientAsync =
closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
this.nmClientAsync.init(this.yarnConfiguration);
- this.initialContainers =
config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
this.requestedContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
this.requestedContainerCores =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
this.containerHostAffinityEnabled =
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
@@ -238,10 +234,6 @@ class YarnService extends AbstractIdleService {
this.helixInstanceTags = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY,
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
- this.containerJvmArgs =
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
-
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
:
- Optional.<String>absent();
-
this.proxyJvmArgs =
config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS)
: StringUtils.EMPTY;
@@ -257,27 +249,12 @@ class YarnService extends AbstractIdleService {
GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS),
TimeUnit.SECONDS).build();
- this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
- GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
- Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 &&
this.jvmMemoryXmxRatio <= 1,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + "
must be between 0 and 1 inclusive");
-
- this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
- Preconditions.checkArgument(this.jvmMemoryOverheadMbs <
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
- GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + "
cannot be more than "
- + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
- + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
this.appViewAcl = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
this.containerTimezone = ConfigUtils.getString(this.config,
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
this.jarCacheEnabled = ConfigUtils.getBoolean(this.config,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED,
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
}
@SuppressWarnings("unused")
@@ -344,7 +321,7 @@ class YarnService extends AbstractIdleService {
this.maxResourceCapacity =
Optional.of(response.getMaximumResourceCapability());
LOGGER.info("Requesting initial containers");
- requestInitialContainers(this.initialContainers);
+ requestInitialContainers();
}
@Override
@@ -419,41 +396,25 @@ class YarnService extends AbstractIdleService {
.build();
}
- /**
- * Request an allocation of containers. If numTargetContainers is larger
than the max of current and expected number
- * of containers then additional containers are requested.
- * <p>
- * If numTargetContainers is less than the current number of allocated
containers then release free containers.
- * Shrinking is relative to the number of currently allocated containers
since it takes time for containers
- * to be allocated and assigned work and we want to avoid releasing a
container prematurely before it is assigned
- * work. This means that a container may not be released even though
numTargetContainers is less than the requested
- * number of containers. The intended usage is for the caller of this method
to make periodic calls to attempt to
- * adjust the cluster towards the desired number of containers.
- *
- * @param inUseInstances a set of in use instances
- * @return whether successfully requested the target number of containers
- */
- public synchronized boolean requestTargetNumberOfContainers(int
numContainers, Set<String> inUseInstances) {
- int defaultContainerMemoryMbs =
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
- int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.
CONTAINER_CORES_KEY);
-
- LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances
count is {}, container map size is {}",
- numContainers, inUseInstances.size(), this.containerMap.size());
-
- requestContainers(numContainers,
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
- LOGGER.info("Current tag-container desired count:{}, tag-container
allocated: {}", numContainers, this.allocatedContainerCountMap);
- return true;
+ /** unless overridden to actually scale, "initial" containers will be the
app's *only* containers! */
+ protected synchronized void requestInitialContainers() {
+ WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config);
+ int numContainers =
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
+ LOGGER.info("Requesting {} initial (static) containers with baseline
(only) profile, never to be re-scaled", numContainers);
+ requestContainersForWorkerProfile(baselineWorkerProfile, numContainers);
}
- // Request initial containers with default resource and helix tag
- private void requestInitialContainers(int containersRequested) {
- requestTargetNumberOfContainers(containersRequested,
Collections.EMPTY_SET);
+ protected synchronized void requestContainersForWorkerProfile(WorkerProfile
workerProfile, int numContainers) {
+ int containerMemoryMbs =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+ int containerCores =
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
+ long allocationRequestId = storeByUniqueAllocationRequestId(workerProfile);
+ requestContainers(numContainers, Resource.newInstance(containerMemoryMbs,
containerCores), Optional.of(allocationRequestId));
}
private void requestContainer(Optional<String> preferredNode,
Optional<Resource> resourceOptional) {
Resource desiredResource = resourceOptional.or(Resource.newInstance(
this.requestedContainerMemoryMbs, this.requestedContainerCores));
- requestContainer(preferredNode, desiredResource);
+ requestContainer(preferredNode, desiredResource, Optional.absent());
}
/**
@@ -462,14 +423,14 @@ class YarnService extends AbstractIdleService {
* @param numContainers
* @param resource
*/
- private void requestContainers(int numContainers, Resource resource) {
- LOGGER.info("Requesting {} containers with resource={}", numContainers,
resource);
+ protected void requestContainers(int numContainers, Resource resource,
Optional<Long> optAllocationRequestId) {
+ LOGGER.info("Requesting {} containers with resource={} and allocation
request id = {}", numContainers, resource, optAllocationRequestId);
IntStream.range(0, numContainers)
- .forEach(i -> requestContainer(Optional.absent(), resource));
+ .forEach(i -> requestContainer(Optional.absent(), resource,
optAllocationRequestId));
}
// Request containers with specific resource requirement
- private void requestContainer(Optional<String> preferredNode, Resource
resource) {
+ private void requestContainer(Optional<String> preferredNode, Resource
resource, Optional<Long> optAllocationRequestId) {
// Fail if Yarn cannot meet container resource requirements
Preconditions.checkArgument(resource.getMemory() <=
this.maxResourceCapacity.get().getMemory() &&
resource.getVirtualCores() <=
this.maxResourceCapacity.get().getVirtualCores(),
@@ -485,8 +446,11 @@ class YarnService extends AbstractIdleService {
priority.setPriority(priorityNum);
String[] preferredNodes = preferredNode.isPresent() ? new String[]
{preferredNode.get()} : null;
+
+ long allocationRequestId =
optAllocationRequestId.or(DEFAULT_ALLOCATION_REQUEST_ID);
+
this.amrmClientAsync.addContainerRequest(
- new AMRMClient.ContainerRequest(resource, preferredNodes, null,
priority));
+ new AMRMClient.ContainerRequest(resource, preferredNodes, null,
priority, allocationRequestId));
}
protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo
containerInfo)
@@ -590,15 +554,49 @@ class YarnService extends AbstractIdleService {
@VisibleForTesting
protected String buildContainerCommand(Container container, String
helixParticipantId, String helixInstanceTag) {
+ long allocationRequestId = container.getAllocationRequestId();
+ WorkerProfile workerProfile =
Optional.fromNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId))
+ .or(() -> {
+ LOGGER.warn("No Worker Profile found for {}, so falling back to
default", allocationRequestId);
+ return
this.workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID,
k -> {
+ LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker
Profile even yet mapped to the default allocation request ID {} - creating one
now", DEFAULT_ALLOCATION_REQUEST_ID);
+ return new WorkerProfile(this.config);
+ });
+ });
+ Config workerProfileConfig = workerProfile.getConfig();
+
+ double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig,
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
+ GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
+
+ int workerJvmMemoryOverheadMbs = ConfigUtils.getInt(workerProfileConfig,
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
+
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
+
+ Preconditions.checkArgument(workerJvmMemoryXmxRatio >= 0 &&
workerJvmMemoryXmxRatio <= 1,
+ workerProfile.getName() + " : " +
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY +
+ " must be between 0 and 1 inclusive");
+
+ long containerMemoryMbs = container.getResource().getMemorySize();
+
+ Preconditions.checkArgument(workerJvmMemoryOverheadMbs <
containerMemoryMbs * workerJvmMemoryXmxRatio,
+ workerProfile.getName() + " : " +
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY +
+ " cannot be more than " +
GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " +
+ GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
+
+ Optional<String> workerJvmArgs =
workerProfileConfig.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)
?
+
Optional.of(workerProfileConfig.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
:
+ Optional.<String>absent();
+
String containerProcessName =
GobblinTemporalYarnTaskRunner.class.getSimpleName();
StringBuilder containerCommand = new StringBuilder()
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
- .append(" -Xmx").append((int) (container.getResource().getMemory() *
this.jvmMemoryXmxRatio) -
- this.jvmMemoryOverheadMbs).append("M")
+ .append(" -Xmx").append((int) (container.getResource().getMemory() *
workerJvmMemoryXmxRatio) -
+ workerJvmMemoryOverheadMbs).append("M")
.append("
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append("
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
- .append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))
+ .append(" ").append(JvmUtils.formatJvmArguments(workerJvmArgs))
.append(" ").append(this.proxyJvmArgs)
.append(" ").append(GobblinTemporalYarnTaskRunner.class.getName())
.append("
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
@@ -755,6 +753,18 @@ class YarnService extends AbstractIdleService {
return eventMetadataBuilder;
}
+ /**
+ * Generates a unique allocation request ID for the given worker profile and
store the id to profile mapping.
+ *
+ * @param workerProfile the worker profile for which the allocation request
ID is generated
+ * @return the generated allocation request ID
+ */
+ protected long storeByUniqueAllocationRequestId(WorkerProfile workerProfile)
{
+ long allocationRequestId = allocationRequestIdGenerator.getAndIncrement();
+ this.workerProfileByAllocationRequestId.put(allocationRequestId,
workerProfile);
+ return allocationRequestId;
+ }
+
/**
* A custom implementation of {@link AMRMClientAsync.CallbackHandler}.
*/
@@ -803,24 +813,36 @@ class YarnService extends AbstractIdleService {
// Find matching requests and remove the request (YARN-660). We the
scheduler are responsible
// for cleaning up requests after allocation based on the design in
the described ticket.
// YARN does not have a delta request API and the requests are not
cleaned up automatically.
- // Try finding a match first with the host as the resource name then
fall back to any resource match.
+ // Try finding a match first with requestAllocationId (which should
always be the case) then fall back to
+ // finding a match with the host as the resource name which then will
fall back to any resource match.
// Also see YARN-1902. Container count will explode without this logic
for removing container requests.
- List<? extends Collection<AMRMClient.ContainerRequest>>
matchingRequests = amrmClientAsync
- .getMatchingRequests(container.getPriority(),
container.getNodeHttpAddress(), container.getResource());
+ Collection<AMRMClient.ContainerRequest>
matchingRequestsByAllocationRequestId =
amrmClientAsync.getMatchingRequests(container.getAllocationRequestId());
+ if (!matchingRequestsByAllocationRequestId.isEmpty()) {
+ AMRMClient.ContainerRequest firstMatchingContainerRequest =
matchingRequestsByAllocationRequestId.iterator().next();
+ LOGGER.info("Found matching requests {}, removing first matching
request {}",
+ matchingRequestsByAllocationRequestId,
firstMatchingContainerRequest);
- if (matchingRequests.isEmpty()) {
- LOGGER.debug("Matching request by host {} not found",
container.getNodeHttpAddress());
+
amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+ } else {
+ LOGGER.info("Matching request by allocation request id {} not
found", container.getAllocationRequestId());
- matchingRequests = amrmClientAsync
- .getMatchingRequests(container.getPriority(),
ResourceRequest.ANY, container.getResource());
- }
+ List<? extends Collection<AMRMClient.ContainerRequest>>
matchingRequestsByHost = amrmClientAsync
+ .getMatchingRequests(container.getPriority(),
container.getNodeHttpAddress(), container.getResource());
- if (!matchingRequests.isEmpty()) {
- AMRMClient.ContainerRequest firstMatchingContainerRequest =
matchingRequests.get(0).iterator().next();
- LOGGER.debug("Found matching requests {}, removing first matching
request {}",
- matchingRequests, firstMatchingContainerRequest);
+ if (matchingRequestsByHost.isEmpty()) {
+ LOGGER.info("Matching request by host {} not found",
container.getNodeHttpAddress());
-
amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+ matchingRequestsByHost = amrmClientAsync
+ .getMatchingRequests(container.getPriority(),
ResourceRequest.ANY, container.getResource());
+ }
+
+ if (!matchingRequestsByHost.isEmpty()) {
+ AMRMClient.ContainerRequest firstMatchingContainerRequest =
matchingRequestsByHost.get(0).iterator().next();
+ LOGGER.info("Found matching requests {}, removing first matching
request {}",
+ matchingRequestsByAllocationRequestId,
firstMatchingContainerRequest);
+
+
amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+ }
}
containerLaunchExecutor.submit(new Runnable() {
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
new file mode 100644
index 0000000000..6bdfe46276
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+ private final AtomicInteger numInvocations = new AtomicInteger(0);
+ private final Optional<ProfileDerivation> derivedFromBaseline;
+ public DummyScalingDirectiveSource() {
+ this.derivedFromBaseline = Optional.of(new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Adding(
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY,
"2048"),
+ new
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+ )
+ ));
+ }
+
+ /**
+ * @return - A fixed set of {@link ScalingDirective}s corresponding to the
invocation number.
+ */
+ @Override
+ public List<ScalingDirective> getScalingDirectives() {
+ // Note - profile should exist already or is derived from other profile
+ int currNumInvocations = this.numInvocations.getAndIncrement();
+ long currentTime = System.currentTimeMillis();
+ if (currNumInvocations == 0) {
+ // here we are returning two new profile with initial container counts
and these should be launched
+ // both profiles should have different timestampEpochMillis so that both
are processed otherwise
+ //
org.apache.gobblin.temporal.dynamic.WorkforcePlan$IllegalRevisionException$OutOfOrderDirective
can occur
+ return Arrays.asList(
+ new ScalingDirective("firstProfile", 3, currentTime,
this.derivedFromBaseline),
+ new ScalingDirective("secondProfile", 2, currentTime + 1,
this.derivedFromBaseline)
+ );
+ } else if (currNumInvocations == 1) {
+ // here we are increasing containers to 5 for firstProfile and 3 for
secondProfile so that 2 new extra containers
+ // should be launched for firstProfile and 1 new extra container for
secondProfile
+ return Arrays.asList(
+ new ScalingDirective("firstProfile", 5, currentTime),
+ new ScalingDirective("secondProfile", 3, currentTime + 1)
+ );
+ } else if (currNumInvocations == 2) {
+ // the count is same as previous invocation so no new containers should
be launched
+ return Arrays.asList(
+ new ScalingDirective("firstProfile", 5, currentTime),
+ new ScalingDirective("secondProfile", 3, currentTime + 1)
+ );
+ }
+ return new ArrayList<>();
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java
new file mode 100644
index 0000000000..b79f808938
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link DummyScalingDirectiveSource} based implementation of {@link
AbstractDynamicScalingYarnServiceManager}.
+ * This class is meant to be used for integration testing purposes only.
+ * This is initialized using config {@link
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
while testing
+ */
+public class DummyDynamicScalingYarnServiceManager extends
AbstractDynamicScalingYarnServiceManager {
+
+ public
DummyDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
+ super(appMaster);
+ }
+
+ @Override
+ protected ScalingDirectiveSource createScalingDirectiveSource() {
+ return new DummyScalingDirectiveSource();
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
new file mode 100644
index 0000000000..dd4243d3fa
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource;
+
+import static
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link AbstractDynamicScalingYarnServiceManager}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+ @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+ @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+ @Mock private GobblinTemporalApplicationMaster
mockGobblinTemporalApplicationMaster;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ // Using 1 second as polling interval so that the test runs faster and
+ // GetScalingDirectivesRunnable.run() will be called equal to amount of
sleep introduced between startUp
+ // and shutDown in seconds
+ Config config =
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL,
ConfigValueFactory.fromAnyRef(1));
+
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+ }
+
+ @Test
+ public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException,
InterruptedException {
+
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null).thenReturn(new
ArrayList<>());
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(3000);
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ }
+
+ /** Note : this test uses {@link DummyScalingDirectiveSource}*/
+ @Test
+ public void testWithDummyScalingDirectiveSource() throws
InterruptedException {
+ // DummyScalingDirectiveSource returns 2 scaling directives in first 3
invocations and after that it returns empty list
+ // so the total number of invocations after three invocations should
always be 3
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, new
DummyScalingDirectiveSource());
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(5000); // 5 seconds sleep so that
GetScalingDirectivesRunnable.run() is called for 5 times
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ }
+
+ @Test
+ public void testWithRandomScalingDirectives() throws IOException,
InterruptedException {
+ ScalingDirective mockScalingDirective =
Mockito.mock(ScalingDirective.class);
+ List<ScalingDirective> mockedScalingDirectives =
Arrays.asList(mockScalingDirective, mockScalingDirective);
+ Mockito.when(mockScalingDirectiveSource.getScalingDirectives())
+ .thenReturn(new ArrayList<>())
+ .thenReturn(mockedScalingDirectives)
+ .thenReturn(mockedScalingDirectives)
+ .thenReturn(null);
+
+ TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
+ mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+ testDynamicScalingYarnServiceManager.startUp();
+ Thread.sleep(5000);
+ testDynamicScalingYarnServiceManager.shutDown();
+ Mockito.verify(mockDynamicScalingYarnService,
Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+ }
+
+ /** Test implementation of {@link AbstractDynamicScalingYarnServiceManager}
which returns passed
+ * {@link ScalingDirectiveSource} when {@link
#createScalingDirectiveSource()} is called while initialising
+ * {@link
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}
+ * */
+ protected static class TestDynamicScalingYarnServiceManager extends
AbstractDynamicScalingYarnServiceManager {
+ private final ScalingDirectiveSource _scalingDirectiveSource;
+ public
TestDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster, ScalingDirectiveSource scalingDirectiveSource) {
+ super(appMaster);
+ this._scalingDirectiveSource = scalingDirectiveSource;
+ }
+
+ @Override
+ protected ScalingDirectiveSource createScalingDirectiveSource() {
+ return this._scalingDirectiveSource;
+ }
+ }
+
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
new file mode 100644
index 0000000000..6c0946aabb
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+import java.util.Collections;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+/** Tests for {@link DynamicScalingYarnService} */
+public class DynamicScalingYarnServiceTest {
+ private Config defaultConfigs;
+ private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+ private final EventBus eventBus = new
EventBus("TemporalDynamicScalingYarnServiceTest");
+
+ @BeforeClass
+ public void setup() {
+ URL url = DynamicScalingYarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); //
using same initial config as of YarnServiceTest
+ Assert.assertNotNull(url, "Could not find resource " + url);
+ this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+ }
+
+ @Test
+ public void testReviseWorkforcePlanAndRequestNewContainers() throws
Exception {
+ int numNewContainers = 5;
+ DynamicScalingYarnService dynamicScalingYarnService = new
DynamicScalingYarnService(this.defaultConfigs, "testApp", "testAppId",
yarnConfiguration, mockFileSystem, eventBus);
+ DynamicScalingYarnService dynamicScalingYarnServiceSpy =
Mockito.spy(dynamicScalingYarnService);
+
Mockito.doNothing().when(dynamicScalingYarnServiceSpy).requestContainers(Mockito.anyInt(),
Mockito.any(Resource.class), Mockito.any(Optional.class));
+ ScalingDirective baseScalingDirective = new
ScalingDirective(WorkforceProfiles.BASELINE_NAME, numNewContainers,
System.currentTimeMillis());
+
dynamicScalingYarnServiceSpy.reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(baseScalingDirective));
+ Mockito.verify(dynamicScalingYarnServiceSpy,
Mockito.times(1)).requestContainers(Mockito.eq(numNewContainers),
Mockito.any(Resource.class), Mockito.any(Optional.class));
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
new file mode 100644
index 0000000000..3c81316b85
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests for {@link YarnService}
+ *
+ * NOTE : This test is a partial clone of {@link
org.apache.gobblin.yarn.YarnServiceTest}
+ * */
+public class YarnServiceTest {
+ private Config defaultConfigs;
+ private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+ private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+ private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+ private AMRMClientAsync mockAMRMClient;
+ private RegisterApplicationMasterResponse
mockRegisterApplicationMasterResponse;
+
+ @BeforeClass
+ public void setup() throws IOException, YarnException {
+ mockAMRMClient = mock(AMRMClientAsync.class);
+ mockRegisterApplicationMasterResponse =
mock(RegisterApplicationMasterResponse.class);
+
+ URL url = YarnServiceTest.class.getClassLoader()
+ .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+ this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+
+ MockedStatic<AMRMClientAsync> amrmClientAsyncMockStatic =
mockStatic(AMRMClientAsync.class);
+
+ amrmClientAsyncMockStatic.when(() ->
AMRMClientAsync.createAMRMClientAsync(anyInt(),
any(AMRMClientAsync.CallbackHandler.class)))
+ .thenReturn(mockAMRMClient);
+ doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+
+ when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(),
anyString()))
+ .thenReturn(mockRegisterApplicationMasterResponse);
+ when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+ .thenReturn(Mockito.mock(Resource.class));
+ }
+
+ @Test
+ public void testYarnServiceStartupWithInitialContainers() throws Exception {
+ int expectedNumContainers = 3;
+ Config config =
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
ConfigValueFactory.fromAnyRef(expectedNumContainers));
+ YarnService yarnService = new YarnService(config, "testApplicationName",
"testApplicationId", yarnConfiguration, mockFileSystem, eventBus);
+ YarnService yarnServiceSpy = Mockito.spy(yarnService);
+
Mockito.doNothing().when(yarnServiceSpy).requestContainers(Mockito.anyInt(),
Mockito.any(Resource.class), Mockito.any(Optional.class));
+ yarnServiceSpy.startUp();
+ Mockito.verify(yarnServiceSpy,
Mockito.times(1)).requestContainers(Mockito.eq(expectedNumContainers),
Mockito.any(Resource.class), Mockito.any(Optional.class));
+ }
+
+ @Test
+ public void testBuildContainerCommand() throws Exception {
+ final double jvmMemoryXmxRatio = 0.7;
+ final int jvmMemoryOverheadMbs = 50;
+ final int resourceMemoryMB = 3072;
+ final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio)
- jvmMemoryOverheadMbs;
+
+ Config config = this.defaultConfigs
+
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));
+
+ Resource resource = Resource.newInstance(resourceMemoryMB, 2);
+
+ Container mockContainer = Mockito.mock(Container.class);
+ Mockito.when(mockContainer.getResource()).thenReturn(resource);
+ Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L);
+
+ YarnService yarnService = new YarnService(
+ config,
+ "testApplicationName",
+ "testApplicationId",
+ yarnConfiguration,
+ mockFileSystem,
+ eventBus
+ );
+
+ yarnService.startUp();
+
+ String command = yarnService.buildContainerCommand(mockContainer,
"testHelixParticipantId", "testHelixInstanceTag");
+ Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M"));
+ }
+}
diff --git a/gobblin-temporal/src/test/resources/YarnServiceTest.conf
b/gobblin-temporal/src/test/resources/YarnServiceTest.conf
new file mode 100644
index 0000000000..0903ced00b
--- /dev/null
+++ b/gobblin-temporal/src/test/resources/YarnServiceTest.conf
@@ -0,0 +1,6 @@
+# Adding some default configs used while initializing YarnService for tests
+gobblin.yarn.initial.containers=0
+gobblin.yarn.container.memory.mbs=1024
+gobblin.yarn.container.cores=1
+gobblin.yarn.container.affinity.enabled=false
+gobblin.yarn.helix.instance.max.retries=1
\ No newline at end of file