[ 
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=945669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-945669
 ]

ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Nov/24 09:48
            Start Date: 26/Nov/24 09:48
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1858055680


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a 
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+  private int count = 0;

Review Comment:
   suggest `invocationsCount` or `numInvocations`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a 
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+  private int count = 0;
+  private final Optional<ProfileDerivation> derivedFromBaseline;
+  public DummyScalingDirectiveSource() {
+    this.derivedFromBaseline = Optional.of(new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
"2048"),
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+        )
+    ));
+  }
+
+  /**
+   * @return {@link ScalingDirective}s - an impl. may choose to return all 
known directives or to give only newer
+   * directives than previously returned
+   */
+  @Override
+  public List<ScalingDirective> getScalingDirectives() {
+    // Note - profile should exist already pr is derived from other profile
+    if (this.count == 0) {
+      this.count++;
+      return Arrays.asList(
+          new ScalingDirective("firstProfile", 3, System.currentTimeMillis(), 
this.derivedFromBaseline),
+          new ScalingDirective("secondProfile", 2, System.currentTimeMillis(), 
this.derivedFromBaseline)

Review Comment:
   for all of these, we must ensure the second of the pair has a later time 
than the first.  since I don't believe the order of arg execution is defined in 
java, would be best to assign currTime and then use it as `currTime + N` the 
second time



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -440,7 +430,7 @@ public synchronized boolean 
requestTargetNumberOfContainers(int numContainers, S
     LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances 
count is {}, container map size is {}",
         numContainers, inUseInstances.size(), this.containerMap.size());
 
-    requestContainers(numContainers, 
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
+    requestContainers(numContainers, 
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores), 
Optional.absent());

Review Comment:
   (WRT the enclosing method...)
   
   does it need to be `public`?  also, if it's only used by 
`requestInitialContainers` we might name it thus.  on the other hand, why does 
special case the reading from `config`, rather than doing 
`WorkerProfile.getConfig`?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName, 
String applicationId,
         GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
         
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), 
TimeUnit.SECONDS).build();
 
-    this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
-        GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
-    Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && 
this.jvmMemoryXmxRatio <= 1,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " 
must be between 0 and 1 inclusive");
-
-    this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-        
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
-    Preconditions.checkArgument(this.jvmMemoryOverheadMbs < 
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " 
cannot be more than "
-            + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
-            + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
     this.appViewAcl = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
     this.containerTimezone = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
     this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+    // Initialising this baseline worker profile to use as default worker 
profile in case allocation request id is not in map
+    this.baselineWorkerProfile = new 
WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config);
+
+    // Putting baseline profile in the map for default allocation request id 
(0)
+    
this.allocationRequestIdToWorkerProfile.put(allocationRequestIdGenerator.getAndIncrement(),
 this.baselineWorkerProfile);

Review Comment:
   doesn't the `generateAllocationRequestId` method do this very thing?  let's 
drop the special handling and call that. ignore the return value if it's not 
needed... but, about that - shouldn't `requestInitialContainers` be passing 
along this allocation req ID (for the baseline profile)?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    this.workforceStaffing = 
WorkforceStaffing.initialize(getInitialContainers());
+    this.workforcePlan = new WorkforcePlan(getConfig(), 
getInitialContainers());

Review Comment:
   would be great if this were more clearly/explicitly based on the baseline 
profile's config



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    this.workforceStaffing = 
WorkforceStaffing.initialize(getInitialContainers());
+    this.workforcePlan = new WorkforcePlan(getConfig(), 
getInitialContainers());
+  }
+
+  /**
+   * Revises the workforce plan and requests new containers based on the given 
scaling directives.
+   *
+   * @param scalingDirectives the list of scaling directives
+   */
+  public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> 
scalingDirectives) {
+    this.workforcePlan.reviseWhenNewer(scalingDirectives);
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.workforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);
+  }
+
+  private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+    deltas.getPerProfileDeltas().forEach(profileDelta -> {
+      if (profileDelta.getDelta() > 0) {
+        WorkerProfile workerProfile = profileDelta.getProfile();
+        String profileName = workerProfile.getName();
+        int curNumContainers = 
this.workforceStaffing.getStaffing(profileName).orElse(0);

Review Comment:
   nit: `curNumContainers` => `currNumContainers`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -485,8 +475,11 @@ private void requestContainer(Optional<String> 
preferredNode, Resource resource)
     priority.setPriority(priorityNum);
 
     String[] preferredNodes = preferredNode.isPresent() ? new String[] 
{preferredNode.get()} : null;
+
+    long allocationRequestID = allocationRequestId.or(0L);

Review Comment:
   please don't use case to differentiate names. how about:
   ```
   long allocationRequestId = optAllocationRequestId.or(0L);
   ```
   ?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a 
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+  private int count = 0;
+  private final Optional<ProfileDerivation> derivedFromBaseline;
+  public DummyScalingDirectiveSource() {
+    this.derivedFromBaseline = Optional.of(new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
"2048"),
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+        )
+    ));
+  }
+
+  /**
+   * @return {@link ScalingDirective}s - an impl. may choose to return all 
known directives or to give only newer
+   * directives than previously returned
+   */
+  @Override
+  public List<ScalingDirective> getScalingDirectives() {
+    // Note - profile should exist already pr is derived from other profile
+    if (this.count == 0) {

Review Comment:
   I realize it's only test code, but best to make thread-safe w/ either 
`synchronized` an `AtomicInteger`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -590,15 +583,43 @@ protected ByteBuffer getSecurityTokens() throws 
IOException {
 
   @VisibleForTesting
   protected String buildContainerCommand(Container container, String 
helixParticipantId, String helixInstanceTag) {
+    long allocationRequestID = container.getAllocationRequestId();

Review Comment:
   nit: `allocationRequestID` => `allocationRequestId`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -755,6 +776,18 @@ private ImmutableMap.Builder<String, String> 
buildContainerStatusEventMetadata(C
     return eventMetadataBuilder;
   }
 
+  /**
+   * Generates a unique allocation request ID for the given worker profile and 
store the id to profile mapping.
+   *
+   * @param workerProfile the worker profile for which the allocation request 
ID is generated
+   * @return the generated allocation request ID
+   */
+  protected long generateAllocationRequestId(WorkerProfile workerProfile) {

Review Comment:
   naming-wise, the side-effect of putting into the map seems critical for 
maintainers to notice. maybe `generateAndStoreByUniqueAllocationRequestId` or 
simply `storeByUniqueAllocationRequestId`?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a 
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+  private int count = 0;
+  private final Optional<ProfileDerivation> derivedFromBaseline;
+  public DummyScalingDirectiveSource() {
+    this.derivedFromBaseline = Optional.of(new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
"2048"),
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+        )
+    ));
+  }
+
+  /**
+   * @return {@link ScalingDirective}s - an impl. may choose to return all 
known directives or to give only newer
+   * directives than previously returned
+   */
+  @Override
+  public List<ScalingDirective> getScalingDirectives() {
+    // Note - profile should exist already pr is derived from other profile
+    if (this.count == 0) {
+      this.count++;
+      return Arrays.asList(
+          new ScalingDirective("firstProfile", 3, System.currentTimeMillis(), 
this.derivedFromBaseline),
+          new ScalingDirective("secondProfile", 2, System.currentTimeMillis(), 
this.derivedFromBaseline)
+      );
+    } else if (this.count == 1) {
+      this.count++;
+      return Arrays.asList(
+          new ScalingDirective("firstProfile", 5, System.currentTimeMillis()),
+          new ScalingDirective("secondProfile", 3, System.currentTimeMillis())
+      );
+    } else if (this.count == 2) {
+      this.count++;
+      return Arrays.asList(
+          new ScalingDirective("firstProfile", 5, System.currentTimeMillis()),
+          new ScalingDirective("secondProfile", 3, System.currentTimeMillis())

Review Comment:
   just noting this is the same as when `this.count == 1`.  is that intended?  
maybe worth an overall comment on the "dummy" testing plan



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -200,6 +201,9 @@ class YarnService extends AbstractIdleService {
   private volatile boolean shutdownInProgress = false;
 
   private final boolean jarCacheEnabled;
+  private final WorkerProfile baselineWorkerProfile;
+  private final AtomicLong allocationRequestIdGenerator = new AtomicLong(0L);
+  private final ConcurrentMap<Long, WorkerProfile> 
allocationRequestIdToWorkerProfile = new ConcurrentHashMap<>();

Review Comment:
   this name works and is ok... but, considering two alternatives:
   ```
   WorkerProfile workerProfile = 
this.allocationRequestIdToWorkerProfile.get(allocationRequestId);
   WorkerProfile workerProfile = 
this.workerProfileByAllocationRequestId.get(allocationRequestId);
   ```
   I prefer the second for its focus on the fact that the worker profile (that 
we're here to retrieve) is the more important half of the mapping.



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName, 
String applicationId,
         GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
         
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), 
TimeUnit.SECONDS).build();
 
-    this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
-        GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
-    Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && 
this.jvmMemoryXmxRatio <= 1,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " 
must be between 0 and 1 inclusive");
-
-    this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-        
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
-    Preconditions.checkArgument(this.jvmMemoryOverheadMbs < 
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " 
cannot be more than "
-            + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
-            + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
     this.appViewAcl = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
     this.containerTimezone = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
     this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+    // Initialising this baseline worker profile to use as default worker 
profile in case allocation request id is not in map
+    this.baselineWorkerProfile = new 
WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config);

Review Comment:
   given you're initializing that here in on the next line of the same ctor, 
when would it NOT be in the map?  I don't really believe you need the 
`getOrDefault` below...
   
   if I'm missing something, and it's needed for b/w-compat when not using 
`DynamicScalingYarnService`, that's OK... just leave a comment.
   



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+  private final String DYNAMIC_SCALING_PREFIX = 
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+  private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX 
+ "directives.dir";
+  private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + 
"errors.dir";
+  private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX 
+ "initial.delay";
+  private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;

Review Comment:
   not sure there's a need to take a separate value.  maybe just use 
`DYNAMIC_SCALING_POLLING_INTERVAL` for both



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+  private final String DYNAMIC_SCALING_PREFIX = 
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+  private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX 
+ "directives.dir";
+  private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + 
"errors.dir";
+  private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX 
+ "initial.delay";
+  private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+  private final String DYNAMIC_SCALING_POLLING_INTERVAL = 
DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  private final Config config;
+  DynamicScalingYarnService dynamicScalingYarnService;

Review Comment:
   why package protected - could it be `private`?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+  private final String DYNAMIC_SCALING_PREFIX = 
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+  private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX 
+ "directives.dir";
+  private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + 
"errors.dir";
+  private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX 
+ "initial.delay";
+  private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+  private final String DYNAMIC_SCALING_POLLING_INTERVAL = 
DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  private final Config config;
+  DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+  private final FileSystem fs;
+
+  public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();

Review Comment:
   just to be a bit clearer on failure, let's test `instanceof` and throw a 
`RuntimeException` w/ a bit more context for the user, when that's violated



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+  private final String DYNAMIC_SCALING_PREFIX = 
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+  private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX 
+ "directives.dir";
+  private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + 
"errors.dir";
+  private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX 
+ "initial.delay";
+  private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+  private final String DYNAMIC_SCALING_POLLING_INTERVAL = 
DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  private final Config config;
+  DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+  private final FileSystem fs;
+
+  public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+            com.google.common.base.Optional.of("DynamicScalingExecutor")));
+    this.fs = appMaster.getFs();
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    int initialDelay = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_INITIAL_DELAY,
+        DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS);
+
+    ScalingDirectiveSource fsScalingDirectiveSource = new 
FsScalingDirectiveSource(
+        this.fs,
+        this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+        Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+    );
+
+    // TODO: remove this line later
+    //  Using for testing purposes only
+    ScalingDirectiveSource scalingDirectiveSource = new 
DummyScalingDirectiveSource();

Review Comment:
   would it be helpful for unit testing if, rather than hard-coding, this class 
took the `ScalingDirectiveSource` FQ class name?  I see that could be harder 
based on the ctor params.
   
   As a simpler alternative, make `DynamicScalingYarnServiceManger` abstract w/ 
a method
   ```
   abstract protected ScalingDirectiveSource createScalingDirectiveSource();
   ```
   and then the concrete `FsSourceDynamicScalingYarnServiceManager` would hard 
code the `ScalingDirectiveSource` class.  you could have a different concrete 
`DSYSM` using `DummyScalingDirectiveSource`.  one of those such FQ class names 
would be a param.
   
   ... which reminds me.... how is this `DSYSM` created and initialized at 
present?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+  private final String DYNAMIC_SCALING_PREFIX = 
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+  private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX 
+ "directives.dir";
+  private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + 
"errors.dir";
+  private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX 
+ "initial.delay";
+  private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+  private final String DYNAMIC_SCALING_POLLING_INTERVAL = 
DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  private final Config config;
+  DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+  private final FileSystem fs;
+
+  public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+            com.google.common.base.Optional.of("DynamicScalingExecutor")));
+    this.fs = appMaster.getFs();
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    int initialDelay = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_INITIAL_DELAY,
+        DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS);
+
+    ScalingDirectiveSource fsScalingDirectiveSource = new 
FsScalingDirectiveSource(
+        this.fs,
+        this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+        Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+    );
+
+    // TODO: remove this line later
+    //  Using for testing purposes only
+    ScalingDirectiveSource scalingDirectiveSource = new 
DummyScalingDirectiveSource();
+
+    log.info("Starting the " + this.getClass().getSimpleName());
+    log.info("Scheduling the dynamic scaling task with an interval of {} 
seconds", scheduleInterval);
+
+    this.dynamicScalingExecutor.scheduleAtFixedRate(
+        new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, 
scalingDirectiveSource),
+        initialDelay, scheduleInterval, TimeUnit.SECONDS
+    );
+  }
+
+  @Override
+  protected void shutDown() {
+    log.info("Stopping the " + this.getClass().getSimpleName());
+    ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, 
com.google.common.base.Optional.of(log));
+  }
+
+  /**
+   * A {@link Runnable} that gets the scaling directives from the {@link 
ScalingDirectiveSource} and passes them to the
+   * {@link DynamicScalingYarnService} for processing.
+   */
+  @AllArgsConstructor
+  static class GetScalingDirectivesRunnable implements Runnable {
+    private final DynamicScalingYarnService dynamicScalingYarnService;
+    private final ScalingDirectiveSource scalingDirectiveSource;
+
+    @Override
+    public void run() {
+      try {
+        List<ScalingDirective> scalingDirectives = 
scalingDirectiveSource.getScalingDirectives();
+        if (!scalingDirectives.isEmpty()) {
+          
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
+        }
+      } catch (IOException e) {
+        log.error("Failed to get scaling directives", e);
+      }

Review Comment:
   I suggest a catch `Throwable`, since this runs in an executor and no body 
ever checks whether the `Runnable` threw, which means the exception might 
silently transpire w/ no notice ever logged



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    this.workforceStaffing = 
WorkforceStaffing.initialize(getInitialContainers());
+    this.workforcePlan = new WorkforcePlan(getConfig(), 
getInitialContainers());
+  }
+
+  /**
+   * Revises the workforce plan and requests new containers based on the given 
scaling directives.
+   *
+   * @param scalingDirectives the list of scaling directives
+   */
+  public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> 
scalingDirectives) {
+    this.workforcePlan.reviseWhenNewer(scalingDirectives);
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.workforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);
+  }
+
+  private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+    deltas.getPerProfileDeltas().forEach(profileDelta -> {
+      if (profileDelta.getDelta() > 0) {
+        WorkerProfile workerProfile = profileDelta.getProfile();
+        String profileName = workerProfile.getName();
+        int curNumContainers = 
this.workforceStaffing.getStaffing(profileName).orElse(0);
+        int delta = profileDelta.getDelta();
+        log.info("Requesting {} new containers for profile {} having currently 
{} containers", delta,
+            profileName, curNumContainers);
+        requestContainersForWorkerProfile(workerProfile, delta);
+        // update our staffing after requesting new containers
+        this.workforceStaffing.reviseStaffing(profileName, curNumContainers + 
delta, System.currentTimeMillis());
+      }
+      // TODO: Decide how to handle negative deltas

Review Comment:
   will you add this before merge?  if not, let's at least log when it's 
observed!





Issue Time Tracking
-------------------

            Worklog Id:     (was: 945669)
    Remaining Estimate: 0h
            Time Spent: 10m

> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
>                 Key: GOBBLIN-2174
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of 
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn 
> Service needs to be integrated with the dynamic scaling to have fully 
> functional dynamic scalable yarn service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to