This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7309060ac3 [GOBBLIN-2174] Define `DynamicScalingYarnService` for the 
GoT workforce via `ScalingDirective`s (#4077)
7309060ac3 is described below

commit 7309060ac363a5303c372a1d363921d56633bab3
Author: Vivek Rai <[email protected]>
AuthorDate: Mon Dec 9 17:03:46 2024 +0530

    [GOBBLIN-2174] Define `DynamicScalingYarnService` for the GoT workforce via 
`ScalingDirective`s (#4077)
---
 .../temporal/GobblinTemporalConfigurationKeys.java |   5 +
 .../gobblin/temporal/dynamic/WorkerProfile.java    |  11 ++
 .../AbstractDynamicScalingYarnServiceManager.java  | 120 ++++++++++++++
 .../temporal/yarn/DynamicScalingYarnService.java   |  99 +++++++++++
 .../FsSourceDynamicScalingYarnServiceManager.java  |  50 ++++++
 .../yarn/GobblinTemporalApplicationMaster.java     |   2 +-
 .../apache/gobblin/temporal/yarn/YarnService.java  | 182 ++++++++++++---------
 .../dynamic/DummyScalingDirectiveSource.java       |  76 +++++++++
 .../DummyDynamicScalingYarnServiceManager.java     |  38 +++++
 .../yarn/DynamicScalingYarnServiceManagerTest.java | 118 +++++++++++++
 .../yarn/DynamicScalingYarnServiceTest.java        |  64 ++++++++
 .../gobblin/temporal/yarn/YarnServiceTest.java     | 126 ++++++++++++++
 .../src/test/resources/YarnServiceTest.conf        |   6 +
 13 files changed, 816 insertions(+), 81 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 40223e093e..3d51f15c19 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -64,4 +64,9 @@ public interface GobblinTemporalConfigurationKeys {
   String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX + 
"num.workers.per.container";
   int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
   String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string";
+
+  /**
+   * Prefix for Gobblin-on-Temporal Dynamic Scaling
+   */
+  String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
index bf1f1d2e09..d67825cb22 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
@@ -18,12 +18,23 @@
 package org.apache.gobblin.temporal.dynamic;
 
 import com.typesafe.config.Config;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 
 
 /** A named worker {@link Config} */
 @Data
+@AllArgsConstructor
 public class WorkerProfile {
   private final String name;
   private final Config config;
+
+  /**
+   * Constructs a `WorkerProfile` with the baseline name and the specified 
configuration.
+   *
+   * @param config the configuration for the worker profile
+   */
+  public WorkerProfile(Config config) {
+    this(WorkforceProfiles.BASELINE_NAME, config);
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
new file mode 100644
index 0000000000..3022d5c9ec
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ *
+ * The actual implemented class needs to be passed as value of config {@link 
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService {" + 
appMaster.get_yarnService().getClass().getName() + "} is not an instance of 
DynamicScalingYarnService";
+      log.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log),
+            Optional.of("DynamicScalingExecutor")));
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    log.info("Starting the {} with re-scaling interval of {} seconds", 
this.getClass().getSimpleName(), scheduleInterval);
+
+    this.dynamicScalingExecutor.scheduleAtFixedRate(
+        new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, 
createScalingDirectiveSource()),
+        scheduleInterval, scheduleInterval, TimeUnit.SECONDS
+    );
+  }
+
+  @Override
+  protected void shutDown() {
+    log.info("Stopping the " + this.getClass().getSimpleName());
+    ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, 
Optional.of(log));
+  }
+
+  /**
+   * Create a {@link ScalingDirectiveSource} to use for getting scaling 
directives.
+   */
+  protected abstract ScalingDirectiveSource createScalingDirectiveSource();
+
+  /**
+   * A {@link Runnable} that gets the scaling directives from the {@link 
ScalingDirectiveSource} and passes them to the
+   * {@link DynamicScalingYarnService} for processing.
+   */
+  @AllArgsConstructor
+  static class GetScalingDirectivesRunnable implements Runnable {
+    private final DynamicScalingYarnService dynamicScalingYarnService;
+    private final ScalingDirectiveSource scalingDirectiveSource;
+
+    @Override
+    public void run() {
+      try {
+        List<ScalingDirective> scalingDirectives = 
scalingDirectiveSource.getScalingDirectives();
+        if (CollectionUtils.isNotEmpty(scalingDirectives)) {
+          
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
+        }
+      } catch (IOException e) {
+        log.error("Failed to get scaling directives", e);
+      } catch (Throwable t) {
+        log.error("Unexpected error with dynamic scaling via directives", t);
+      }
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
new file mode 100644
index 0000000000..0720017b85
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers already requested for each 
worker profile */
+  private final WorkforceStaffing actualWorkforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
+    this.workforcePlan = new WorkforcePlan(this.config, 
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
+  }
+
+  @Override
+  protected synchronized void requestInitialContainers() {
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);
+  }
+
+  /**
+   * Revises the workforce plan and requests new containers based on the given 
scaling directives.
+   *
+   * @param scalingDirectives the list of scaling directives
+   */
+  public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> 
scalingDirectives) {
+    if (CollectionUtils.isEmpty(scalingDirectives)) {
+      return;
+    }
+    this.workforcePlan.reviseWhenNewer(scalingDirectives);
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);
+  }
+
+  private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+    deltas.getPerProfileDeltas().forEach(profileDelta -> {
+      if (profileDelta.getDelta() > 0) { // scale up!
+        WorkerProfile workerProfile = profileDelta.getProfile();
+        String profileName = workerProfile.getName();
+        int currNumContainers = 
this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
+        int delta = profileDelta.getDelta();
+        log.info("Requesting {} new containers for profile {} having currently 
{} containers", delta,
+            WorkforceProfiles.renderName(profileName), currNumContainers);
+        requestContainersForWorkerProfile(workerProfile, delta);
+        // update our staffing after requesting new containers
+        this.actualWorkforceStaffing.reviseStaffing(profileName, 
currNumContainers + delta, System.currentTimeMillis());
+      } else if (profileDelta.getDelta() < 0) { // scale down!
+        // TODO: Decide how to handle negative deltas
+        log.warn("Handling of Negative delta is not supported yet : Profile {} 
delta {} ",
+            profileDelta.getProfile().getName(), profileDelta.getDelta());
+      } // else, already at staffing plan (or at least have requested, so 
in-progress)
+    });
+  }
+
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
new file mode 100644
index 0000000000..f6b65bbd2d
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.Optional;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link FsScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
+ */
+public class FsSourceDynamicScalingYarnServiceManager extends 
AbstractDynamicScalingYarnServiceManager {
+  // TODO: replace fetching of these configs using a new method similar to 
JobStateUtils::getWorkDirRoot
+  public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
+  public final static String DYNAMIC_SCALING_ERRORS_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
+  private final FileSystem fs;
+
+  public 
FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    super(appMaster);
+    this.fs = appMaster.getFs();
+  }
+
+  @Override
+  protected ScalingDirectiveSource createScalingDirectiveSource() {
+    return new FsScalingDirectiveSource(
+        this.fs,
+        this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+        Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+    );
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
index b7957bd9a2..3efadb11b3 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java
@@ -114,7 +114,7 @@ public class GobblinTemporalApplicationMaster extends 
GobblinTemporalClusterMana
   protected YarnService buildTemporalYarnService(Config config, String 
applicationName, String applicationId,
       YarnConfiguration yarnConfiguration, FileSystem fs)
       throws Exception {
-    return new YarnService(config, applicationName, applicationId, 
yarnConfiguration, fs, this.eventBus);
+    return new DynamicScalingYarnService(config, applicationName, 
applicationId, yarnConfiguration, fs, this.eventBus);
   }
 
   /**
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
index c8fbd047c5..ec4da215a6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -34,6 +33,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.IntStream;
 
 import org.apache.commons.lang.StringUtils;
@@ -109,6 +109,7 @@ import org.apache.gobblin.yarn.YarnHelixUtils;
 import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
 import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
 import org.apache.gobblin.yarn.event.NewContainerRequest;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
 
 /**
  * This class is responsible for all Yarn-related stuffs including 
ApplicationMaster registration,
@@ -130,8 +131,7 @@ class YarnService extends AbstractIdleService {
   private final String appViewAcl;
   //Default helix instance tag derived from cluster level config
   private final String helixInstanceTags;
-
-  private final Config config;
+  protected final Config config;
 
   private final EventBus eventBus;
 
@@ -146,17 +146,11 @@ class YarnService extends AbstractIdleService {
   private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
   private final NMClientAsync nmClientAsync;
   private final ExecutorService containerLaunchExecutor;
-
-  private final int initialContainers;
   private final int requestedContainerMemoryMbs;
   private final int requestedContainerCores;
-  private final int jvmMemoryOverheadMbs;
-  private final double jvmMemoryXmxRatio;
   private final boolean containerHostAffinityEnabled;
 
   private final int helixInstanceMaxRetries;
-
-  private final Optional<String> containerJvmArgs;
   private final String containerTimezone;
   private final String proxyJvmArgs;
 
@@ -200,6 +194,9 @@ class YarnService extends AbstractIdleService {
   private volatile boolean shutdownInProgress = false;
 
   private final boolean jarCacheEnabled;
+  private static final long DEFAULT_ALLOCATION_REQUEST_ID = 0L;
+  private final AtomicLong allocationRequestIdGenerator = new 
AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID);
+  private final ConcurrentMap<Long, WorkerProfile> 
workerProfileByAllocationRequestId = new ConcurrentHashMap<>();
 
   public YarnService(Config config, String applicationName, String 
applicationId, YarnConfiguration yarnConfiguration,
       FileSystem fs, EventBus eventBus) throws Exception {
@@ -229,7 +226,6 @@ class YarnService extends AbstractIdleService {
     this.nmClientAsync = 
closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler()));
     this.nmClientAsync.init(this.yarnConfiguration);
 
-    this.initialContainers = 
config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
     this.requestedContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
     this.requestedContainerCores = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
     this.containerHostAffinityEnabled = 
config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
@@ -238,10 +234,6 @@ class YarnService extends AbstractIdleService {
     this.helixInstanceTags = ConfigUtils.getString(config,
         GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, 
GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
 
-    this.containerJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
-        
Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
 :
-        Optional.<String>absent();
-
     this.proxyJvmArgs = 
config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ?
         
config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) 
: StringUtils.EMPTY;
 
@@ -257,27 +249,12 @@ class YarnService extends AbstractIdleService {
         GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
         
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), 
TimeUnit.SECONDS).build();
 
-    this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
-        GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
-    Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && 
this.jvmMemoryXmxRatio <= 1,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " 
must be between 0 and 1 inclusive");
-
-    this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-        
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
-    Preconditions.checkArgument(this.jvmMemoryOverheadMbs < 
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " 
cannot be more than "
-            + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
-            + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
     this.appViewAcl = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
     this.containerTimezone = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
     this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
   }
 
   @SuppressWarnings("unused")
@@ -344,7 +321,7 @@ class YarnService extends AbstractIdleService {
     this.maxResourceCapacity = 
Optional.of(response.getMaximumResourceCapability());
 
     LOGGER.info("Requesting initial containers");
-    requestInitialContainers(this.initialContainers);
+    requestInitialContainers();
   }
 
   @Override
@@ -419,41 +396,25 @@ class YarnService extends AbstractIdleService {
         .build();
   }
 
-  /**
-   * Request an allocation of containers. If numTargetContainers is larger 
than the max of current and expected number
-   * of containers then additional containers are requested.
-   * <p>
-   * If numTargetContainers is less than the current number of allocated 
containers then release free containers.
-   * Shrinking is relative to the number of currently allocated containers 
since it takes time for containers
-   * to be allocated and assigned work and we want to avoid releasing a 
container prematurely before it is assigned
-   * work. This means that a container may not be released even though 
numTargetContainers is less than the requested
-   * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
-   * adjust the cluster towards the desired number of containers.
-   *
-   * @param inUseInstances  a set of in use instances
-   * @return whether successfully requested the target number of containers
-   */
-  public synchronized boolean requestTargetNumberOfContainers(int 
numContainers, Set<String> inUseInstances) {
-    int defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
-    int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. 
CONTAINER_CORES_KEY);
-
-    LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances 
count is {}, container map size is {}",
-        numContainers, inUseInstances.size(), this.containerMap.size());
-
-    requestContainers(numContainers, 
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
-    LOGGER.info("Current tag-container desired count:{}, tag-container 
allocated: {}", numContainers, this.allocatedContainerCountMap);
-    return true;
+  /** unless overridden to actually scale, "initial" containers will be the 
app's *only* containers! */
+  protected synchronized void requestInitialContainers() {
+    WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config);
+    int numContainers = 
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
+    LOGGER.info("Requesting {} initial (static) containers with baseline 
(only) profile, never to be re-scaled", numContainers);
+    requestContainersForWorkerProfile(baselineWorkerProfile, numContainers);
   }
 
-  // Request initial containers with default resource and helix tag
-  private void requestInitialContainers(int containersRequested) {
-    requestTargetNumberOfContainers(containersRequested, 
Collections.EMPTY_SET);
+  protected synchronized void requestContainersForWorkerProfile(WorkerProfile 
workerProfile, int numContainers) {
+    int containerMemoryMbs = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+    int containerCores = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
+    long allocationRequestId = storeByUniqueAllocationRequestId(workerProfile);
+    requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, 
containerCores), Optional.of(allocationRequestId));
   }
 
   private void requestContainer(Optional<String> preferredNode, 
Optional<Resource> resourceOptional) {
     Resource desiredResource = resourceOptional.or(Resource.newInstance(
         this.requestedContainerMemoryMbs, this.requestedContainerCores));
-    requestContainer(preferredNode, desiredResource);
+    requestContainer(preferredNode, desiredResource, Optional.absent());
   }
 
   /**
@@ -462,14 +423,14 @@ class YarnService extends AbstractIdleService {
    * @param numContainers
    * @param resource
    */
-  private void requestContainers(int numContainers, Resource resource) {
-    LOGGER.info("Requesting {} containers with resource={}", numContainers, 
resource);
+  protected void requestContainers(int numContainers, Resource resource, 
Optional<Long> optAllocationRequestId) {
+    LOGGER.info("Requesting {} containers with resource={} and allocation 
request id = {}", numContainers, resource, optAllocationRequestId);
     IntStream.range(0, numContainers)
-        .forEach(i -> requestContainer(Optional.absent(), resource));
+        .forEach(i -> requestContainer(Optional.absent(), resource, 
optAllocationRequestId));
   }
 
   // Request containers with specific resource requirement
-  private void requestContainer(Optional<String> preferredNode, Resource 
resource) {
+  private void requestContainer(Optional<String> preferredNode, Resource 
resource, Optional<Long> optAllocationRequestId) {
     // Fail if Yarn cannot meet container resource requirements
     Preconditions.checkArgument(resource.getMemory() <= 
this.maxResourceCapacity.get().getMemory() &&
             resource.getVirtualCores() <= 
this.maxResourceCapacity.get().getVirtualCores(),
@@ -485,8 +446,11 @@ class YarnService extends AbstractIdleService {
     priority.setPriority(priorityNum);
 
     String[] preferredNodes = preferredNode.isPresent() ? new String[] 
{preferredNode.get()} : null;
+
+    long allocationRequestId = 
optAllocationRequestId.or(DEFAULT_ALLOCATION_REQUEST_ID);
+
     this.amrmClientAsync.addContainerRequest(
-        new AMRMClient.ContainerRequest(resource, preferredNodes, null, 
priority));
+        new AMRMClient.ContainerRequest(resource, preferredNodes, null, 
priority, allocationRequestId));
   }
 
   protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo 
containerInfo)
@@ -590,15 +554,49 @@ class YarnService extends AbstractIdleService {
 
   @VisibleForTesting
   protected String buildContainerCommand(Container container, String 
helixParticipantId, String helixInstanceTag) {
+    long allocationRequestId = container.getAllocationRequestId();
+    WorkerProfile workerProfile = 
Optional.fromNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId))
+        .or(() -> {
+          LOGGER.warn("No Worker Profile found for {}, so falling back to 
default", allocationRequestId);
+          return 
this.workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID,
 k -> {
+            LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker 
Profile even yet mapped to the default allocation request ID {} - creating one 
now", DEFAULT_ALLOCATION_REQUEST_ID);
+            return new WorkerProfile(this.config);
+          });
+        });
+    Config workerProfileConfig = workerProfile.getConfig();
+
+    double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig,
+        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
+        GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
+
+    int workerJvmMemoryOverheadMbs = ConfigUtils.getInt(workerProfileConfig,
+        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
+        
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
+
+    Preconditions.checkArgument(workerJvmMemoryXmxRatio >= 0 && 
workerJvmMemoryXmxRatio <= 1,
+        workerProfile.getName() + " : " + 
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY +
+            " must be between 0 and 1 inclusive");
+
+    long containerMemoryMbs = container.getResource().getMemorySize();
+
+    Preconditions.checkArgument(workerJvmMemoryOverheadMbs < 
containerMemoryMbs * workerJvmMemoryXmxRatio,
+        workerProfile.getName() + " : " + 
GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY +
+            " cannot be more than " + 
GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " +
+            GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
+
+    Optional<String> workerJvmArgs = 
workerProfileConfig.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)
 ?
+        
Optional.of(workerProfileConfig.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY))
 :
+        Optional.<String>absent();
+
     String containerProcessName = 
GobblinTemporalYarnTaskRunner.class.getSimpleName();
     StringBuilder containerCommand = new StringBuilder()
         
.append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
-        .append(" -Xmx").append((int) (container.getResource().getMemory() * 
this.jvmMemoryXmxRatio) -
-            this.jvmMemoryOverheadMbs).append("M")
+        .append(" -Xmx").append((int) (container.getResource().getMemory() * 
workerJvmMemoryXmxRatio) -
+            workerJvmMemoryOverheadMbs).append("M")
         .append(" 
-D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
         .append(" 
-D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT)
-        .append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs))
+        .append(" ").append(JvmUtils.formatJvmArguments(workerJvmArgs))
         .append(" ").append(this.proxyJvmArgs)
         .append(" ").append(GobblinTemporalYarnTaskRunner.class.getName())
         .append(" 
--").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME)
@@ -755,6 +753,18 @@ class YarnService extends AbstractIdleService {
     return eventMetadataBuilder;
   }
 
+  /**
+   * Generates a unique allocation request ID for the given worker profile and 
store the id to profile mapping.
+   *
+   * @param workerProfile the worker profile for which the allocation request 
ID is generated
+   * @return the generated allocation request ID
+   */
+  protected long storeByUniqueAllocationRequestId(WorkerProfile workerProfile) 
{
+    long allocationRequestId = allocationRequestIdGenerator.getAndIncrement();
+    this.workerProfileByAllocationRequestId.put(allocationRequestId, 
workerProfile);
+    return allocationRequestId;
+  }
+
   /**
    * A custom implementation of {@link AMRMClientAsync.CallbackHandler}.
    */
@@ -803,24 +813,36 @@ class YarnService extends AbstractIdleService {
         // Find matching requests and remove the request (YARN-660). We the 
scheduler are responsible
         // for cleaning up requests after allocation based on the design in 
the described ticket.
         // YARN does not have a delta request API and the requests are not 
cleaned up automatically.
-        // Try finding a match first with the host as the resource name then 
fall back to any resource match.
+        // Try finding a match first with requestAllocationId (which should 
always be the case) then fall back to
+        // finding a match with the host as the resource name which then will 
fall back to any resource match.
         // Also see YARN-1902. Container count will explode without this logic 
for removing container requests.
-        List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests = amrmClientAsync
-            .getMatchingRequests(container.getPriority(), 
container.getNodeHttpAddress(), container.getResource());
+        Collection<AMRMClient.ContainerRequest> 
matchingRequestsByAllocationRequestId = 
amrmClientAsync.getMatchingRequests(container.getAllocationRequestId());
+        if (!matchingRequestsByAllocationRequestId.isEmpty()) {
+          AMRMClient.ContainerRequest firstMatchingContainerRequest = 
matchingRequestsByAllocationRequestId.iterator().next();
+          LOGGER.info("Found matching requests {}, removing first matching 
request {}",
+              matchingRequestsByAllocationRequestId, 
firstMatchingContainerRequest);
 
-        if (matchingRequests.isEmpty()) {
-          LOGGER.debug("Matching request by host {} not found", 
container.getNodeHttpAddress());
+          
amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+        } else {
+          LOGGER.info("Matching request by allocation request id {} not 
found", container.getAllocationRequestId());
 
-          matchingRequests = amrmClientAsync
-              .getMatchingRequests(container.getPriority(), 
ResourceRequest.ANY, container.getResource());
-        }
+          List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequestsByHost = amrmClientAsync
+              .getMatchingRequests(container.getPriority(), 
container.getNodeHttpAddress(), container.getResource());
 
-        if (!matchingRequests.isEmpty()) {
-          AMRMClient.ContainerRequest firstMatchingContainerRequest = 
matchingRequests.get(0).iterator().next();
-          LOGGER.debug("Found matching requests {}, removing first matching 
request {}",
-              matchingRequests, firstMatchingContainerRequest);
+          if (matchingRequestsByHost.isEmpty()) {
+            LOGGER.info("Matching request by host {} not found", 
container.getNodeHttpAddress());
 
-          
amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+            matchingRequestsByHost = amrmClientAsync
+                .getMatchingRequests(container.getPriority(), 
ResourceRequest.ANY, container.getResource());
+          }
+
+          if (!matchingRequestsByHost.isEmpty()) {
+            AMRMClient.ContainerRequest firstMatchingContainerRequest = 
matchingRequestsByHost.get(0).iterator().next();
+            LOGGER.info("Found matching requests {}, removing first matching 
request {}",
+                matchingRequestsByAllocationRequestId, 
firstMatchingContainerRequest);
+
+            
amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest);
+          }
         }
 
         containerLaunchExecutor.submit(new Runnable() {
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
new file mode 100644
index 0000000000..6bdfe46276
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a 
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+  private final AtomicInteger numInvocations = new AtomicInteger(0);
+  private final Optional<ProfileDerivation> derivedFromBaseline;
+  public DummyScalingDirectiveSource() {
+    this.derivedFromBaseline = Optional.of(new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
"2048"),
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+        )
+    ));
+  }
+
+  /**
+   * @return - A fixed set of  {@link ScalingDirective}s corresponding to the 
invocation number.
+   */
+  @Override
+  public List<ScalingDirective> getScalingDirectives() {
+    // Note - profile should exist already or is derived from other profile
+    int currNumInvocations = this.numInvocations.getAndIncrement();
+    long currentTime = System.currentTimeMillis();
+    if (currNumInvocations == 0) {
+      // here we are returning two new profile with initial container counts 
and these should be launched
+      // both profiles should have different timestampEpochMillis so that both 
are processed otherwise
+      // 
org.apache.gobblin.temporal.dynamic.WorkforcePlan$IllegalRevisionException$OutOfOrderDirective
 can occur
+      return Arrays.asList(
+          new ScalingDirective("firstProfile", 3, currentTime, 
this.derivedFromBaseline),
+          new ScalingDirective("secondProfile", 2, currentTime + 1, 
this.derivedFromBaseline)
+      );
+    } else if (currNumInvocations == 1) {
+      // here we are increasing containers to 5 for firstProfile and 3 for 
secondProfile so that 2 new extra containers
+      // should be launched for firstProfile and 1 new extra container for 
secondProfile
+      return Arrays.asList(
+          new ScalingDirective("firstProfile", 5, currentTime),
+          new ScalingDirective("secondProfile", 3, currentTime + 1)
+      );
+    } else if (currNumInvocations == 2) {
+      // the count is same as previous invocation so no new containers should 
be launched
+      return Arrays.asList(
+          new ScalingDirective("firstProfile", 5, currentTime),
+          new ScalingDirective("secondProfile", 3, currentTime + 1)
+      );
+    }
+    return new ArrayList<>();
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java
new file mode 100644
index 0000000000..b79f808938
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link DummyScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
+ *  This class is meant to be used for integration testing purposes only.
+ *  This is initialized using config {@link 
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
 while testing
+ */
+public class DummyDynamicScalingYarnServiceManager extends 
AbstractDynamicScalingYarnServiceManager {
+
+  public 
DummyDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    super(appMaster);
+  }
+
+  @Override
+  protected ScalingDirectiveSource createScalingDirectiveSource() {
+    return new DummyScalingDirectiveSource();
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
new file mode 100644
index 0000000000..dd4243d3fa
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource;
+
+import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link AbstractDynamicScalingYarnServiceManager}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+  @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+  @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+  @Mock private GobblinTemporalApplicationMaster 
mockGobblinTemporalApplicationMaster;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    // Using 1 second as polling interval so that the test runs faster and
+    // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
+    // and shutDown in seconds
+    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+    
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null).thenReturn(new
 ArrayList<>());
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(3000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  /** Note : this test uses {@link DummyScalingDirectiveSource}*/
+  @Test
+  public void testWithDummyScalingDirectiveSource() throws 
InterruptedException {
+    // DummyScalingDirectiveSource returns 2 scaling directives in first 3 
invocations and after that it returns empty list
+    // so the total number of invocations after three invocations should 
always be 3
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, new 
DummyScalingDirectiveSource());
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(5000); // 5 seconds sleep so that 
GetScalingDirectivesRunnable.run() is called for 5 times
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  @Test
+  public void testWithRandomScalingDirectives() throws IOException, 
InterruptedException {
+    ScalingDirective mockScalingDirective = 
Mockito.mock(ScalingDirective.class);
+    List<ScalingDirective> mockedScalingDirectives = 
Arrays.asList(mockScalingDirective, mockScalingDirective);
+    Mockito.when(mockScalingDirectiveSource.getScalingDirectives())
+        .thenReturn(new ArrayList<>())
+        .thenReturn(mockedScalingDirectives)
+        .thenReturn(mockedScalingDirectives)
+        .thenReturn(null);
+
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(5000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  /** Test implementation of {@link AbstractDynamicScalingYarnServiceManager} 
which returns passed
+   * {@link ScalingDirectiveSource} when {@link 
#createScalingDirectiveSource()} is called while initialising
+   * {@link 
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}
+   * */
+  protected static class TestDynamicScalingYarnServiceManager extends 
AbstractDynamicScalingYarnServiceManager {
+    private final ScalingDirectiveSource _scalingDirectiveSource;
+    public 
TestDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster, ScalingDirectiveSource scalingDirectiveSource) {
+      super(appMaster);
+      this._scalingDirectiveSource = scalingDirectiveSource;
+    }
+
+    @Override
+    protected ScalingDirectiveSource createScalingDirectiveSource() {
+      return this._scalingDirectiveSource;
+    }
+  }
+
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
new file mode 100644
index 0000000000..6c0946aabb
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+import java.util.Collections;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+/** Tests for {@link DynamicScalingYarnService} */
+public class DynamicScalingYarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new 
EventBus("TemporalDynamicScalingYarnServiceTest");
+
+  @BeforeClass
+  public void setup() {
+    URL url = DynamicScalingYarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); // 
using same initial config as of YarnServiceTest
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+  }
+
+  @Test
+  public void testReviseWorkforcePlanAndRequestNewContainers() throws 
Exception {
+    int numNewContainers = 5;
+    DynamicScalingYarnService dynamicScalingYarnService = new 
DynamicScalingYarnService(this.defaultConfigs, "testApp", "testAppId", 
yarnConfiguration, mockFileSystem, eventBus);
+    DynamicScalingYarnService dynamicScalingYarnServiceSpy = 
Mockito.spy(dynamicScalingYarnService);
+    
Mockito.doNothing().when(dynamicScalingYarnServiceSpy).requestContainers(Mockito.anyInt(),
 Mockito.any(Resource.class), Mockito.any(Optional.class));
+    ScalingDirective baseScalingDirective = new 
ScalingDirective(WorkforceProfiles.BASELINE_NAME, numNewContainers, 
System.currentTimeMillis());
+    
dynamicScalingYarnServiceSpy.reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(baseScalingDirective));
+    Mockito.verify(dynamicScalingYarnServiceSpy, 
Mockito.times(1)).requestContainers(Mockito.eq(numNewContainers), 
Mockito.any(Resource.class), Mockito.any(Optional.class));
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
new file mode 100644
index 0000000000..3c81316b85
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests for {@link YarnService}
+ *
+ * NOTE : This test is a partial clone of {@link 
org.apache.gobblin.yarn.YarnServiceTest}
+ * */
+public class YarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+  private AMRMClientAsync mockAMRMClient;
+  private RegisterApplicationMasterResponse 
mockRegisterApplicationMasterResponse;
+
+  @BeforeClass
+  public void setup() throws IOException, YarnException {
+    mockAMRMClient = mock(AMRMClientAsync.class);
+    mockRegisterApplicationMasterResponse = 
mock(RegisterApplicationMasterResponse.class);
+
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+
+    MockedStatic<AMRMClientAsync> amrmClientAsyncMockStatic = 
mockStatic(AMRMClientAsync.class);
+
+    amrmClientAsyncMockStatic.when(() -> 
AMRMClientAsync.createAMRMClientAsync(anyInt(), 
any(AMRMClientAsync.CallbackHandler.class)))
+        .thenReturn(mockAMRMClient);
+    doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class));
+
+    when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), 
anyString()))
+        .thenReturn(mockRegisterApplicationMasterResponse);
+    when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability())
+        .thenReturn(Mockito.mock(Resource.class));
+  }
+
+  @Test
+  public void testYarnServiceStartupWithInitialContainers() throws Exception {
+    int expectedNumContainers = 3;
+    Config config = 
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 ConfigValueFactory.fromAnyRef(expectedNumContainers));
+    YarnService yarnService = new YarnService(config, "testApplicationName", 
"testApplicationId", yarnConfiguration, mockFileSystem, eventBus);
+    YarnService yarnServiceSpy = Mockito.spy(yarnService);
+    
Mockito.doNothing().when(yarnServiceSpy).requestContainers(Mockito.anyInt(), 
Mockito.any(Resource.class), Mockito.any(Optional.class));
+    yarnServiceSpy.startUp();
+    Mockito.verify(yarnServiceSpy, 
Mockito.times(1)).requestContainers(Mockito.eq(expectedNumContainers), 
Mockito.any(Resource.class), Mockito.any(Optional.class));
+  }
+
+  @Test
+  public void testBuildContainerCommand() throws Exception {
+    final double jvmMemoryXmxRatio = 0.7;
+    final int jvmMemoryOverheadMbs = 50;
+    final int resourceMemoryMB = 3072;
+    final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) 
- jvmMemoryOverheadMbs;
+
+    Config config = this.defaultConfigs
+        
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+        
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));
+
+    Resource resource = Resource.newInstance(resourceMemoryMB, 2);
+
+    Container mockContainer = Mockito.mock(Container.class);
+    Mockito.when(mockContainer.getResource()).thenReturn(resource);
+    Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L);
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    yarnService.startUp();
+
+    String command = yarnService.buildContainerCommand(mockContainer, 
"testHelixParticipantId", "testHelixInstanceTag");
+    Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M"));
+  }
+}
diff --git a/gobblin-temporal/src/test/resources/YarnServiceTest.conf 
b/gobblin-temporal/src/test/resources/YarnServiceTest.conf
new file mode 100644
index 0000000000..0903ced00b
--- /dev/null
+++ b/gobblin-temporal/src/test/resources/YarnServiceTest.conf
@@ -0,0 +1,6 @@
+# Adding some default configs used while initializing YarnService for tests
+gobblin.yarn.initial.containers=0
+gobblin.yarn.container.memory.mbs=1024
+gobblin.yarn.container.cores=1
+gobblin.yarn.container.affinity.enabled=false
+gobblin.yarn.helix.instance.max.retries=1
\ No newline at end of file

Reply via email to