[ 
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946768&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946768
 ]

ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/24 00:18
            Start Date: 05/Dec/24 00:18
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1870177782


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService is not an instance of DynamicScalingYarnService";
+      log.error(errorMsg);

Review Comment:
   be sure to say what the *actual* instance is



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService is not an instance of DynamicScalingYarnService";
+      log.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log),
+            Optional.of("DynamicScalingExecutor")));
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    log.info("Starting the " + this.getClass().getSimpleName());
+    log.info("Scheduling the dynamic scaling task with an interval of {} 
seconds", scheduleInterval);

Review Comment:
   nit: combine these into one log msg:
   ```
   "Starting the {} with re-scaling interval of {} seconds", ....
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;

Review Comment:
   for clarity, suggest to name along the lines of `actualWorkforceStaffing` 
(whereas the one internal to `WorkforcePlan` is planned/intended)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -803,24 +810,36 @@ public void onContainersAllocated(List<Container> 
containers) {
         // Find matching requests and remove the request (YARN-660). We the 
scheduler are responsible
         // for cleaning up requests after allocation based on the design in 
the described ticket.
         // YARN does not have a delta request API and the requests are not 
cleaned up automatically.
-        // Try finding a match first with the host as the resource name then 
fall back to any resource match.
+        // Try finding a match first with requestAllocationId (which should 
always be the case) then fall back to
+        // finding a match with the host as the resource name which then will 
fall back to any resource match.
         // Also see YARN-1902. Container count will explode without this logic 
for removing container requests.
-        List<? extends Collection<AMRMClient.ContainerRequest>> 
matchingRequests = amrmClientAsync
-            .getMatchingRequests(container.getPriority(), 
container.getNodeHttpAddress(), container.getResource());
+        Collection<AMRMClient.ContainerRequest> 
matchingRequestsByAllocationRequestId = 
amrmClientAsync.getMatchingRequests(container.getAllocationRequestId());
+        if (!matchingRequestsByAllocationRequestId.isEmpty()) {
+          AMRMClient.ContainerRequest firstMatchingContainerRequest = 
matchingRequestsByAllocationRequestId.iterator().next();
+          LOGGER.debug("Found matching requests {}, removing first matching 
request {}",

Review Comment:
   esp. until we are comfortably using in prod, these may better be `.info` log 
level.  there may be a lot of repetitive logging, when we scale many machines, 
but that's still O(100).  often the number will be much smaller.  either way, 
such diagnostic info may prove invaluable :)



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * A dummy implementation of {@link ScalingDirectiveSource} that returns a 
fixed set of {@link ScalingDirective}s.
+ */
+public class DummyScalingDirectiveSource implements ScalingDirectiveSource {
+  private final AtomicInteger numInvocations = new AtomicInteger(0);
+  private final Optional<ProfileDerivation> derivedFromBaseline;
+  public DummyScalingDirectiveSource() {
+    this.derivedFromBaseline = Optional.of(new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
"2048"),
+            new 
ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2")
+        )
+    ));
+  }
+
+  /**
+   * @return - A fixed set of  {@link ScalingDirective}s corresponding to the 
invocation number.
+   */
+  @Override
+  public List<ScalingDirective> getScalingDirectives() {
+    // Note - profile should exist already or is derived from other profile
+    if (this.numInvocations.get() == 0) {
+      this.numInvocations.getAndIncrement();

Review Comment:
   nit: `incrementAndGet()` once at the top and locally save aside the value to 
use in these `if/else`s
   
   ```
   long currentTime = System.currentTimeMillis();
   ```
   could also be DRY'd up



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link 
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+  @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+  @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+  @Mock private GobblinTemporalApplicationMaster 
mockGobblinTemporalApplicationMaster;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    // Using 1 second as polling interval so that the test runs faster and
+    // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
+    // and shutDown in seconds
+    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+    
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsNull() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsEmpty() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new 
ArrayList<>());
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  /** Note : this test uses {@link 
org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource}*/
+  @Test
+  public void testWithDummyScalingDirectiveSource() throws 
InterruptedException {
+    // DummyScalingDirectiveSource returns 2 scaling directives in first 3 
invocations and after that it returns empty list
+    // so the total number of invocations after three invocations should 
always be 3
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, new 
DummyScalingDirectiveSource());
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(5000); // 5 seconds sleep so that 
GetScalingDirectivesRunnable.run() is called for 5 times
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());

Review Comment:
   I'm tempted to verify further what it actually did.  e.g. that it called 
`requestContainers`. what do you think?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.loadgen.dynamic;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.yarn.GobblinTemporalApplicationMaster;
+import 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager;
+
+/**
+ * {@link DummyScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
+ *  This class is meant to be used for testing purposes only.
+ */

Review Comment:
   how is this to be used?  are you hard coding when making a private build or 
passing by config?  if not the latter yet, would it be worth getting it to work 
that way?



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+  @BeforeClass
+  public void setup() {
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+  }
+
+  @Test
+  public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws 
Exception {
+    final int containerMemoryMbs = 1500;
+    final int containerCores = 5;
+    final int numContainers = 4;
+    Config config = this.defaultConfigs
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, 
ConfigValueFactory.fromAnyRef(containerCores))
+        .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, 
ConfigValueFactory.fromAnyRef(numContainers));
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
 containerMemoryMbs);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
 containerCores);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
 numContainers);
+  }
+
+  @Test
+  public void testBuildContainerCommand() throws Exception {
+    final double jvmMemoryXmxRatio = 0.7;
+    final int jvmMemoryOverheadMbs = 50;
+    final int resourceMemoryMB = 3072;
+    final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) 
- jvmMemoryOverheadMbs;
+
+    Config config = 
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
 ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+        
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));
+
+    Resource resource = Resource.newInstance(resourceMemoryMB, 2);
+
+    Container mockContainer = Mockito.mock(Container.class);
+    Mockito.when(mockContainer.getResource()).thenReturn(resource);
+    Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L);
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    String command = yarnService.buildContainerCommand(mockContainer, 
"testHelixParticipantId", "testHelixInstanceTag");
+    Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M"));

Review Comment:
   first off, thank you for kicking off testing for this module!
   
   this seems to be a partial clone of the `YarnServiceTest` in `gobblin-yarn`. 
 I suggest noting that origin for maintainers in the javadoc.  is any [other 
validation from the 
original](https://github.com/apache/gobblin/blob/ab62e72839bb4824b98a442de4dd1647284e880e/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java#L149)
 worth including here too?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########


Review Comment:
   I really like what you've done here w/ separating concerns via the abstract 
base class! :D



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -419,41 +403,20 @@ private EventSubmitter buildEventSubmitter() {
         .build();
   }
 
-  /**
-   * Request an allocation of containers. If numTargetContainers is larger 
than the max of current and expected number
-   * of containers then additional containers are requested.
-   * <p>
-   * If numTargetContainers is less than the current number of allocated 
containers then release free containers.
-   * Shrinking is relative to the number of currently allocated containers 
since it takes time for containers
-   * to be allocated and assigned work and we want to avoid releasing a 
container prematurely before it is assigned
-   * work. This means that a container may not be released even though 
numTargetContainers is less than the requested
-   * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
-   * adjust the cluster towards the desired number of containers.
-   *
-   * @param inUseInstances  a set of in use instances
-   * @return whether successfully requested the target number of containers
-   */
-  public synchronized boolean requestTargetNumberOfContainers(int 
numContainers, Set<String> inUseInstances) {
-    int defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
-    int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. 
CONTAINER_CORES_KEY);
-
-    LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances 
count is {}, container map size is {}",
-        numContainers, inUseInstances.size(), this.containerMap.size());
-
-    requestContainers(numContainers, 
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
-    LOGGER.info("Current tag-container desired count:{}, tag-container 
allocated: {}", numContainers, this.allocatedContainerCountMap);
-    return true;
-  }
-
-  // Request initial containers with default resource and helix tag
-  private void requestInitialContainers(int containersRequested) {
-    requestTargetNumberOfContainers(containersRequested, 
Collections.EMPTY_SET);
+  /** Request Initial containers using baselineWorkerProfile */

Review Comment:
   let's generalize this impl for reuse, by removing the "baseline" / "initial" 
parts.  let's drop the presumption of a well-known default allocation ID and 
refactor by essentially moving the impl from the `DynamicScalingYarnService` to 
this base class:
   ```
    private synchronized void requestContainersForWorkerProfile(WorkerProfile 
workerProfile, int numContainers) {
       int containerMemoryMbs = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
       int containerCores = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
       long allocationRequestId = 
storeByUniqueAllocationRequestId(workerProfile);
       requestContainers(numContainers, 
Resource.newInstance(containerMemoryMbs, containerCores), 
Optional.of(allocationRequestId));
     }
   ```
   afterwards, requesting initial containers is merely reaching into config for 
`numInitialContainers` and call this common impl
   
   



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService is not an instance of DynamicScalingYarnService";
+      log.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log),
+            Optional.of("DynamicScalingExecutor")));
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    log.info("Starting the " + this.getClass().getSimpleName());
+    log.info("Scheduling the dynamic scaling task with an interval of {} 
seconds", scheduleInterval);
+
+    this.dynamicScalingExecutor.scheduleAtFixedRate(
+        new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, 
createScalingDirectiveSource()),
+        scheduleInterval, scheduleInterval, TimeUnit.SECONDS
+    );
+  }
+
+  @Override
+  protected void shutDown() {
+    log.info("Stopping the " + this.getClass().getSimpleName());
+    ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, 
Optional.of(log));
+  }
+
+  /**
+   * Create a {@link ScalingDirectiveSource} to use for getting scaling 
directives.
+   */
+  protected abstract ScalingDirectiveSource createScalingDirectiveSource();
+
+  /**
+   * A {@link Runnable} that gets the scaling directives from the {@link 
ScalingDirectiveSource} and passes them to the
+   * {@link DynamicScalingYarnService} for processing.
+   */
+  @AllArgsConstructor
+  static class GetScalingDirectivesRunnable implements Runnable {
+    private final DynamicScalingYarnService dynamicScalingYarnService;
+    private final ScalingDirectiveSource scalingDirectiveSource;
+
+    @Override
+    public void run() {
+      try {
+        List<ScalingDirective> scalingDirectives = 
scalingDirectiveSource.getScalingDirectives();
+        if (CollectionUtils.isNotEmpty(scalingDirectives)) {
+          
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
+        }
+      } catch (IOException e) {
+        log.error("Failed to get scaling directives", e);
+      } catch (Throwable t) {
+        log.error("Suppressing error from GetScalingDirectivesRunnable.run()", 
t);

Review Comment:
   no need to name where, as that shows in the stack trace.  maybe just 
"Unexpected error with dynamic scaling via directives"



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.Optional;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link FsScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
+ */
+public class FsSourceDynamicScalingYarnServiceManager extends 
AbstractDynamicScalingYarnServiceManager {
+  private final static String DYNAMIC_SCALING_DIRECTIVES_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
+  private final static String DYNAMIC_SCALING_ERRORS_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";

Review Comment:
   `public`, as other code may wish to reference/reuse



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java:
##########


Review Comment:
   these two classes might better belong in `src/test/java`...
   
   anyway, they're completely unrelated to the "temporal load-gen" of the 
`loadgen` pkg



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    int initialContainers = 
this.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
+
+    this.workforceStaffing = WorkforceStaffing.initialize(initialContainers);

Review Comment:
   [less preferred!] either worth a comment that this truly pre-counts the 
initial containers in advance of them actually being allocated (when 
`startUp()` runs)
   
   [preferred] alternatively, just initialize as empty (which is most accurate):
   ```
   this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
   ```
   
   then have the base class `YarnService::startup` call a method you override 
here
   ```
   @Override
   protected void requestInitialContainers() {
     StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
     requestNewContainersForStaffingDeltas(deltas);
   }
   ```
   ?
   



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+  @BeforeClass
+  public void setup() {
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+  }
+
+  @Test
+  public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws 
Exception {
+    final int containerMemoryMbs = 1500;
+    final int containerCores = 5;
+    final int numContainers = 4;
+    Config config = this.defaultConfigs
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, 
ConfigValueFactory.fromAnyRef(containerCores))
+        .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, 
ConfigValueFactory.fromAnyRef(numContainers));
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
 containerMemoryMbs);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
 containerCores);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
 numContainers);
+  }
+
+  @Test
+  public void testBuildContainerCommand() throws Exception {
+    final double jvmMemoryXmxRatio = 0.7;
+    final int jvmMemoryOverheadMbs = 50;
+    final int resourceMemoryMB = 3072;
+    final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) 
- jvmMemoryOverheadMbs;
+
+    Config config = 
this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
 ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio))
+        
.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, 
ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs));

Review Comment:
   nit: would line up better if both `.withValue` started a new line



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -257,27 +250,18 @@ public YarnService(Config config, String applicationName, 
String applicationId,
         GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS,
         
GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), 
TimeUnit.SECONDS).build();
 
-    this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY,
-        GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO);
-
-    Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && 
this.jvmMemoryXmxRatio <= 1,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " 
must be between 0 and 1 inclusive");
-
-    this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY,
-        
GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS);
-
-    Preconditions.checkArgument(this.jvmMemoryOverheadMbs < 
this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio,
-        GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " 
cannot be more than "
-            + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * "
-            + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY);
-
     this.appViewAcl = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.APP_VIEW_ACL,
         GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL);
     this.containerTimezone = ConfigUtils.getString(this.config, 
GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE,
         GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE);
     this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, 
GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT);
+
+    // Initialising this baseline worker profile to use as default worker 
profile in case allocation request id is not in map
+    this.baselineWorkerProfile = new 
WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config);

Review Comment:
   sorry, I don't want to be too pedantic, but the most important 
initialization for Dynamic Scaling is `WorkforcePlan` as:
   ```
   new WorkforcePlan(this.config, 
config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
   ```
   which you do in the DynamicScalingYarnService` ctor.  when that's present, 
the baseline profile should follow from that plan, rather than the reverse of 
initializing the plan w/ a free-floating baseline profile, as you have here.
   
   AFAICT, there's no need to maintain a `baselineWorkerProfile` member in this 
class at all.  instead introduce an overridable method to be called within 
`startUp` and give it this fallback impl:
   ```
   /** unless overridden to actually scale, "initial" containers may be the 
app's *only* containers! */
   protected void requestInitialContainers() {
     WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config);  // 
NOTE: I suggest adding an overloaded single arg ctor, which internally uses 
`WorkforceProfiles.BASELINE_NAME
     int numContainers = 
workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);
     LOGGER.info("Requesting {} initial (static) containers with baseline 
(only) profile, never to be re-scaled", numContainers);
     requestContainersForWorkerProfile(workerProfile, numContainers);
   }
   ```



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.net.URL;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import com.google.common.eventbus.EventBus;
+
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/** Tests for {@link YarnService}*/
+public class YarnServiceTest {
+  private Config defaultConfigs;
+  private final YarnConfiguration yarnConfiguration = new YarnConfiguration();
+  private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class);
+  private final EventBus eventBus = new EventBus("TemporalYarnServiceTest");
+
+  @BeforeClass
+  public void setup() {
+    URL url = YarnServiceTest.class.getClassLoader()
+        .getResource(YarnServiceTest.class.getSimpleName() + ".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+    this.defaultConfigs = ConfigFactory.parseURL(url).resolve();
+  }
+
+  @Test
+  public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws 
Exception {
+    final int containerMemoryMbs = 1500;
+    final int containerCores = 5;
+    final int numContainers = 4;
+    Config config = this.defaultConfigs
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, 
ConfigValueFactory.fromAnyRef(containerMemoryMbs))
+        .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, 
ConfigValueFactory.fromAnyRef(containerCores))
+        .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, 
ConfigValueFactory.fromAnyRef(numContainers));
+
+    YarnService yarnService = new YarnService(
+        config,
+        "testApplicationName",
+        "testApplicationId",
+        yarnConfiguration,
+        mockFileSystem,
+        eventBus
+    );
+
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY),
 containerMemoryMbs);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY),
 containerCores);
+    
Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY),
 numContainers);

Review Comment:
   `baselineWorkerProfile` seems like such an internal impl detail that I'm not 
sure it belongs in validation.  how about instead defining the 
`requestInitialContainers` method I proposed above and having this test call 
`YarnService::startUp` before validating the args in a call to a method like:
   ```
   protected void requestContainers(int numContainers, Resource resource, 
Optional<Long> optAllocationRequestId)
   ```
   ?



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link 
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+  @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+  @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+  @Mock private GobblinTemporalApplicationMaster 
mockGobblinTemporalApplicationMaster;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    // Using 1 second as polling interval so that the test runs faster and
+    // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
+    // and shutDown in seconds
+    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+    
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsNull() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());

Review Comment:
   `Mockito.never()`



##########
gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
+
+/** Tests for {@link 
AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/
+public class DynamicScalingYarnServiceManagerTest {
+
+  @Mock private DynamicScalingYarnService mockDynamicScalingYarnService;
+  @Mock private ScalingDirectiveSource mockScalingDirectiveSource;
+  @Mock private GobblinTemporalApplicationMaster 
mockGobblinTemporalApplicationMaster;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+    // Using 1 second as polling interval so that the test runs faster and
+    // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
+    // and shutDown in seconds
+    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
+    
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsNull() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null);
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }
+
+  @Test
+  public void testWhenScalingDirectivesIsEmpty() throws IOException, 
InterruptedException {
+    
Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new 
ArrayList<>());
+    TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
+        mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource);
+    testDynamicScalingYarnServiceManager.startUp();
+    Thread.sleep(2000);
+    testDynamicScalingYarnServiceManager.shutDown();
+    Mockito.verify(mockDynamicScalingYarnService, 
Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList());
+  }

Review Comment:
   suggest to combine w/ test above, by chaining a second `.thenReturn` 
invocation there





Issue Time Tracking
-------------------

    Worklog Id:     (was: 946768)
    Time Spent: 3h  (was: 2h 50m)

> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
>                 Key: GOBBLIN-2174
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of 
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn 
> Service needs to be integrated with the dynamic scaling to have fully 
> functional dynamic scalable yarn service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to