phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1870177782


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService is not an instance of DynamicScalingYarnService";
+      log.error(errorMsg);

Review Comment:
   be sure to say what the *actual* instance is



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService is not an instance of DynamicScalingYarnService";
+      log.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log),
+            Optional.of("DynamicScalingExecutor")));
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    log.info("Starting the " + this.getClass().getSimpleName());
+    log.info("Scheduling the dynamic scaling task with an interval of {} 
seconds", scheduleInterval);

Review Comment:
   nit: combine these into one log msg:
   ```
   "Starting the {} with re-scaling interval of {} seconds", ....
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;

Review Comment:
   for clarity, suggest to name along the lines of `actualWorkforceStaffing` 
(whereas the one internal to `WorkforcePlan` is planned/intended)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -803,24 +810,36 @@ public void onContainersAllocated(List<Container> 
containers) {
         // Find matching requests and remove the request (YARN-660). We the 
scheduler are responsible
         // for cleaning up requests after allocation based on the design in 
the described ticket.
         // YARN does not have a delta request API and the requests are not 
cleaned up automatically.
-        // Try finding a match first with the host as the resource name then 
fall back to any resource match.
+        // Try finding a match first with requestAllocationId (which should 
always be the case) then fall back to
+        // finding a match with the host as the resource name which then will 
fall back to any resource match.
         // Also see YARN-1902. Container count will explode without this logic 
for removing container requests.
-        List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests = amrmClientAsync
-            .getMatchingRequests(container.getPriority(), 
container.getNodeHttpAddress(), container.getResource());
+        Collection<AMRMClient.ContainerRequest> 
matchingRequestsByAllocationRequestId = 
amrmClientAsync.getMatchingRequests(container.getAllocationRequestId());
+        if (!matchingRequestsByAllocationRequestId.isEmpty()) {
+          AMRMClient.ContainerRequest firstMatchingContainerRequest = 
matchingRequestsByAllocationRequestId.iterator().next();
+          LOGGER.debug("Found matching requests {}, removing first matching 
request {}",

Review Comment:
   esp. until we are comfortably using in prod, these may better be `.info` log 
level.  there may be a lot of repetitive logging, when we scale many machines, 
but that's still O(100).  often the number will be much smaller.  either way, 
such diagnostic info may prove invaluable :)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a 
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+  private final AtomicInteger numInvocations = new AtomicInteger(0);
+  private final Optional<ProfileDerivation> derivedFromBaseline;
+  public DummyScalingDirectiveSource() {
+    this.derivedFromBaseline = Optional.of(new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
"2048"),
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+        )
+    ));
+  }
+
+  /**
+   * @return - A fixed set of  {@link ScalingDirective}s corresponding to the 
invocation number.
+   */
+  @Override
+  public List<ScalingDirective> getScalingDirectives() {
+    // Note - profile should exist already or is derived from other profile
+    if (this.numInvocations.get() == 0) {
+      this.numInvocations.getAndIncrement();

Review Comment:
   nit: `incrementAndGet()` once at the top and locally save aside the value to 
use in these `if/else`s
   
   ```
   long currentTime = System.currentTimeMillis();
   ```
   could also be DRY'd up



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link 
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+  @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+  @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+  @Mock private GobblinTemporalApplicationMaster 
mockGobblinTemporalApplicationMaster;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    // Using 1 second as polling interval so that the test runs faster and
+    // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
+    // and shutDown in seconds
+    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+    
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsNull() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsEmpty() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new 
ArrayList<>());
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  /** Note : this test uses {@link 
org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource}*/
+  @Test
+  public void testWithDummyScalingDirectiveSource() throws 
InterruptedException {
+    // DummyScalingDirectiveSource returns 2 scaling directives in first 3 
invocations and after that it returns empty list
+    // so the total number of invocations after three invocations should 
always be 3
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, new 
DummyScalingDirectiveSource());
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(5000); // 5 seconds sleep so that 
GetScalingDirectivesRunnable.run() is called for 5 times
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());

Review Comment:
   I'm tempted to verify further what it actually did.  e.g. that it called 
`requestContainers`. what do you think?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.yarn.GobblinTemporalApplicationMaster;
+import 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager;
+
+/**
+ * {@link DummyScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
+ *  This class is meant to be used for testing purposes only.
+ */

Review Comment:
   how is this to be used?  are you hard coding when making a private build or 
passing by config?  if not the latter yet, would it be worth getting it to work 
that way?



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+  @BeforeClass
+  public void setup() {
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+  }
+
+  @Test
+  public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws 
Exception {
+    final int containerMemoryMbs = 1500;
+    final int containerCores = 5;
+    final int numContainers = 4;
+    Config config = this.defaultConfigs
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, 
ConfigValueFactory.fromAnyRef(containerCores))
+        .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, 
ConfigValueFactory.fromAnyRef(numContainers));
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
 containerMemoryMbs);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
 containerCores);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
 numContainers);
+  }
+
+  @Test
+  public void testBuildContainerCommand() throws Exception {
+    final double jvmMemoryXmxRatio = 0.7;
+    final int jvmMemoryOverheadMbs = 50;
+    final int resourceMemoryMB = 3072;
+    final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) 
- jvmMemoryOverheadMbs;
+
+    Config config = 
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
 ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+        
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));
+
+    Resource resource = Resource.newInstance(resourceMemoryMB, 2);
+
+    Container mockContainer = Mockito.mock(Container.class);
+    Mockito.when(mockContainer.getResource()).thenReturn(resource);
+    Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L);
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    String command = yarnService.buildContainerCommand(mockContainer, 
"testHelixParticipantId", "testHelixInstanceTag");
+    Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M"));

Review Comment:
   first off, thank you for kicking off testing for this module!
   
   this seems to be a partial clone of the `YarnServiceTest` in `gobblin-yarn`. 
 I suggest noting that origin for maintainers in the javadoc.  is any [other 
validation from the 
original](https://github.com/apache/gobblin/blob/ab62e72839bb4824b98a442de4dd1647284e880e/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java#L149)
 worth including here too?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########


Review Comment:
   I really like what you've done here w/ separating concerns via the abstract 
base class! :D



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -419,41 +403,20 @@ private EventSubmitter buildEventSubmitter() {
         .build();
   }
 
-  /**
-   * Request an allocation of containers. If numTargetContainers is larger 
than the max of current and expected number
-   * of containers then additional containers are requested.
-   * <p>
-   * If numTargetContainers is less than the current number of allocated 
containers then release free containers.
-   * Shrinking is relative to the number of currently allocated containers 
since it takes time for containers
-   * to be allocated and assigned work and we want to avoid releasing a 
container prematurely before it is assigned
-   * work. This means that a container may not be released even though 
numTargetContainers is less than the requested
-   * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
-   * adjust the cluster towards the desired number of containers.
-   *
-   * @param inUseInstances  a set of in use instances
-   * @return whether successfully requested the target number of containers
-   */
-  public synchronized boolean requestTargetNumberOfContainers(int 
numContainers, Set<String> inUseInstances) {
-    int defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
-    int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. 
CONTAINER_CORES_KEY);
-
-    LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances 
count is {}, container map size is {}",
-        numContainers, inUseInstances.size(), this.containerMap.size());
-
-    requestContainers(numContainers, 
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
-    LOGGER.info("Current tag-container desired count:{}, tag-container 
allocated: {}", numContainers, this.allocatedContainerCountMap);
-    return true;
-  }
-
-  // Request initial containers with default resource and helix tag
-  private void requestInitialContainers(int containersRequested) {
-    requestTargetNumberOfContainers(containersRequested, 
Collections.EMPTY_SET);
+  /** Request Initial containers using baselineWorkerProfile */

Review Comment:
   let's generalize this impl for reuse, by removing the "baseline" / "initial" 
parts.  let's drop the presumption of a well-known default allocation ID and 
refactor by essentially moving the impl from the `DynamicScalingYarnService` to 
this base class:
   ```
    private synchronized void requestContainersForWorkerProfile(WorkerProfile 
workerProfile, int numContainers) {
       int containerMemoryMbs = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
       int containerCores = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
       long allocationRequestId = 
storeByUniqueAllocationRequestId(workerProfile);
       requestContainers(numContainers, 
Resource.newInstance(containerMemoryMbs, containerCores), 
Optional.of(allocationRequestId));
     }
   ```
   afterwards, requesting initial containers is merely reaching into config for 
`numInitialContainers` and call this common impl
   
   



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService is not an instance of DynamicScalingYarnService";
+      log.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log),
+            Optional.of("DynamicScalingExecutor")));
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    log.info("Starting the " + this.getClass().getSimpleName());
+    log.info("Scheduling the dynamic scaling task with an interval of {} 
seconds", scheduleInterval);
+
+    this.dynamicScalingExecutor.scheduleAtFixedRate(
+        new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, 
createScalingDirectiveSource()),
+        scheduleInterval, scheduleInterval, TimeUnit.SECONDS
+    );
+  }
+
+  @Override
+  protected void shutDown() {
+    log.info("Stopping the " + this.getClass().getSimpleName());
+    ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, 
Optional.of(log));
+  }
+
+  /**
+   * Create a {@link ScalingDirectiveSource} to use for getting scaling 
directives.
+   */
+  protected abstract ScalingDirectiveSource createScalingDirectiveSource();
+
+  /**
+   * A {@link Runnable} that gets the scaling directives from the {@link 
ScalingDirectiveSource} and passes them to the
+   * {@link DynamicScalingYarnService} for processing.
+   */
+  @AllArgsConstructor
+  static class GetScalingDirectivesRunnable implements Runnable {
+    private final DynamicScalingYarnService dynamicScalingYarnService;
+    private final ScalingDirectiveSource scalingDirectiveSource;
+
+    @Override
+    public void run() {
+      try {
+        List<ScalingDirective> scalingDirectives = 
scalingDirectiveSource.getScalingDirectives();
+        if (CollectionUtils.isNotEmpty(scalingDirectives)) {
+          
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
+        }
+      } catch (IOException e) {
+        log.error("Failed to get scaling directives", e);
+      } catch (Throwable t) {
+        log.error("Suppressing error from GetScalingDirectivesRunnable.run()", 
t);

Review Comment:
   no need to name where, as that shows in the stack trace.  maybe just 
"Unexpected error with dynamic scaling via directives"



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.Optional;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link FsScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
+ */
+public class FsSourceDynamicScalingYarnServiceManager extends 
AbstractDynamicScalingYarnServiceManager {
+  private final static String DYNAMIC_SCALING_DIRECTIVES_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
+  private final static String DYNAMIC_SCALING_ERRORS_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";

Review Comment:
   `public`, as other code may wish to reference/reuse



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########


Review Comment:
   these two classes might better belong in `src/test/java`...
   
   anyway, they're completely unrelated to the "temporal load-gen" of the 
`loadgen` pkg



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    int initialContainers = 
this.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
+
+    this.workforceStaffing = WorkforceStaffing.initialize(initialContainers);

Review Comment:
   [less preferred!] either worth a comment that this truly pre-counts the 
initial containers in advance of them actually being allocated (when 
`startUp()` runs)
   
   [preferred] alternatively, just initialize as empty (which is most accurate):
   ```
   this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
   ```
   
   then have the base class `YarnService::startup` call a method you override 
here
   ```
   @Override
   protected void requestInitialContainers() {
     StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
     requestNewContainersForStaffingDeltas(deltas);
   }
   ```
   ?
   



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+  @BeforeClass
+  public void setup() {
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+  }
+
+  @Test
+  public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws 
Exception {
+    final int containerMemoryMbs = 1500;
+    final int containerCores = 5;
+    final int numContainers = 4;
+    Config config = this.defaultConfigs
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, 
ConfigValueFactory.fromAnyRef(containerCores))
+        .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, 
ConfigValueFactory.fromAnyRef(numContainers));
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
 containerMemoryMbs);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
 containerCores);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
 numContainers);
+  }
+
+  @Test
+  public void testBuildContainerCommand() throws Exception {
+    final double jvmMemoryXmxRatio = 0.7;
+    final int jvmMemoryOverheadMbs = 50;
+    final int resourceMemoryMB = 3072;
+    final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) 
- jvmMemoryOverheadMbs;
+
+    Config config = 
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
 ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+        
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));

Review Comment:
   nit: would line up better if both `.withValue` started a new line



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -257,27 +250,18 @@ public YarnService(Config config, String applicationName, 
String applicationId,
         GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
         
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), 
TimeUnit.SECONDS).build();
 
-    this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
-        GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
-    Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && 
this.jvmMemoryXmxRatio <= 1,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " 
must be between 0 and 1 inclusive");
-
-    this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-        
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
-    Preconditions.checkArgument(this.jvmMemoryOverheadMbs < 
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " 
cannot be more than "
-            + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
-            + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
     this.appViewAcl = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
     this.containerTimezone = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
     this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+    // Initialising this baseline worker profile to use as default worker 
profile in case allocation request id is not in map
+    this.baselineWorkerProfile = new 
WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config);

Review Comment:
   sorry, I don't want to be too pedantic, but the most important 
initialization for Dynamic Scaling is `WorkforcePlan` as:
   ```
   new WorkforcePlan(this.config, 
config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
   ```
   which you do in the DynamicScalingYarnService` ctor.  when that's present, 
the baseline profile should follow from that plan, rather than the reverse of 
initializing the plan w/ a free-floating baseline profile, as you have here.
   
   AFAICT, there's no need to maintain a `baselineWorkerProfile` member in this 
class at all.  instead introduce an overridable method to be called within 
`startUp` and give it this fallback impl:
   ```
   /** unless overridden to actually scale, "initial" containers may be the 
app's *only* containers! */
   protected void requestInitialContainers() {
     WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config);  // 
NOTE: I suggest adding an overloaded single arg ctor, which internally uses 
`WorkforceProfiles.BASELINE_NAME
     int numContainers = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
     LOGGER.info("Requesting {} initial (static) containers with baseline 
(only) profile, never to be re-scaled", numContainers);
     requestContainersForWorkerProfile(workerProfile, numContainers);
   }
   ```



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+  @BeforeClass
+  public void setup() {
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+  }
+
+  @Test
+  public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws 
Exception {
+    final int containerMemoryMbs = 1500;
+    final int containerCores = 5;
+    final int numContainers = 4;
+    Config config = this.defaultConfigs
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, 
ConfigValueFactory.fromAnyRef(containerCores))
+        .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, 
ConfigValueFactory.fromAnyRef(numContainers));
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
 containerMemoryMbs);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
 containerCores);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
 numContainers);

Review Comment:
   `baselineWorkerProfile` seems like such an internal impl detail that I'm not 
sure it belongs in validation.  how about instead defining the 
`requestInitialContainers` method I proposed above and having this test call 
`YarnService::startUp` before validating the args in a call to a method like:
   ```
   protected void requestContainers(int numContainers, Resource resource, 
Optional<Long> optAllocationRequestId)
   ```
   ?



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link 
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+  @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+  @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+  @Mock private GobblinTemporalApplicationMaster 
mockGobblinTemporalApplicationMaster;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    // Using 1 second as polling interval so that the test runs faster and
+    // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
+    // and shutDown in seconds
+    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+    
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsNull() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());

Review Comment:
   `Mockito.never()`



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link 
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+  @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+  @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+  @Mock private GobblinTemporalApplicationMaster 
mockGobblinTemporalApplicationMaster;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    // Using 1 second as polling interval so that the test runs faster and
+    // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
+    // and shutDown in seconds
+    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+    
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsNull() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsEmpty() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new 
ArrayList<>());
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }

Review Comment:
   suggest to combine w/ test above, by chaining a second `.thenReturn` 
invocation there



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to