[ 
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946925&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946925
 ]

ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Dec/24 19:44
            Start Date: 05/Dec/24 19:44
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1871870894


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing actualWorkforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
+    this.workforcePlan = new WorkforcePlan(this.config, 
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
+  }
+
+  @Override
+  protected void requestInitialContainers() {
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);

Review Comment:
   I believe we're safe given `WorkforcePlan::calcStaffingDeltas` is 
thread-safe and `reqNewCon...` is `synchronized`, but that's subtle.  for the 
sake of maintainers' sanity, please also make this method `synchronized`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ *
+ * The actual implemented class needs to be passed as value of config {@link 
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService {" + 
appMaster.get_yarnService().getClass().getSimpleName() + "} is not an instance 
of DynamicScalingYarnService";
+      log.error(errorMsg);
+      throw new RuntimeException(errorMsg);
+    }
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log),
+            Optional.of("DynamicScalingExecutor")));
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    log.info("Starting the {} with re-scaling interval of {} seconds", 
this.getClass().getSimpleName(), scheduleInterval);

Review Comment:
   simple name seems fine here, as this is informational, not for debugging



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing actualWorkforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
+    this.workforcePlan = new WorkforcePlan(this.config, 
this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
+  }
+
+  @Override
+  protected void requestInitialContainers() {
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);
+  }
+
+  /**
+   * Revises the workforce plan and requests new containers based on the given 
scaling directives.
+   *
+   * @param scalingDirectives the list of scaling directives
+   */
+  public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> 
scalingDirectives) {
+    this.workforcePlan.reviseWhenNewer(scalingDirectives);
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);
+  }
+
+  private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+    deltas.getPerProfileDeltas().forEach(profileDelta -> {
+      if (profileDelta.getDelta() > 0) {

Review Comment:
   nit: would comment:
   ```
   if (...) { // scale up!
     ...
   } else if (profileDelta.getDelta() < 0) { // scale down!
     ...
   } // else, already at staffing plan (or at least have requested, so 
in-progress)
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -419,41 +398,25 @@ private EventSubmitter buildEventSubmitter() {
         .build();
   }
 
-  /**
-   * Request an allocation of containers. If numTargetContainers is larger 
than the max of current and expected number
-   * of containers then additional containers are requested.
-   * <p>
-   * If numTargetContainers is less than the current number of allocated 
containers then release free containers.
-   * Shrinking is relative to the number of currently allocated containers 
since it takes time for containers
-   * to be allocated and assigned work and we want to avoid releasing a 
container prematurely before it is assigned
-   * work. This means that a container may not be released even though 
numTargetContainers is less than the requested
-   * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
-   * adjust the cluster towards the desired number of containers.
-   *
-   * @param inUseInstances  a set of in use instances
-   * @return whether successfully requested the target number of containers
-   */
-  public synchronized boolean requestTargetNumberOfContainers(int 
numContainers, Set<String> inUseInstances) {
-    int defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
-    int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. 
CONTAINER_CORES_KEY);
-
-    LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances 
count is {}, container map size is {}",
-        numContainers, inUseInstances.size(), this.containerMap.size());
-
-    requestContainers(numContainers, 
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
-    LOGGER.info("Current tag-container desired count:{}, tag-container 
allocated: {}", numContainers, this.allocatedContainerCountMap);
-    return true;
+  /** unless overridden to actually scale, "initial" containers may be the 
app's *only* containers! */
+  protected void requestInitialContainers() {
+    WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config);
+    int numContainers = 
baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY);

Review Comment:
   use it directly:
   ```
   this.config.getInt(...)
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */

Review Comment:
   *already* requested



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java:
##########
@@ -18,12 +18,19 @@
 package org.apache.gobblin.temporal.dynamic;
 
 import com.typesafe.config.Config;
+import lombok.AllArgsConstructor;
 import lombok.Data;
 
 
 /** A named worker {@link Config} */
 @Data
+@AllArgsConstructor
 public class WorkerProfile {
   private final String name;
   private final Config config;
+
+  public WorkerProfile(Config config) {
+    this.name = WorkforceProfiles.BASELINE_NAME;
+    this.config = config;

Review Comment:
   nit:
   ```
   this(WorkforceProfiles.BASELINE_NAME, config)
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -485,8 +448,11 @@ private void requestContainer(Optional<String> 
preferredNode, Resource resource)
     priority.setPriority(priorityNum);
 
     String[] preferredNodes = preferredNode.isPresent() ? new String[] 
{preferredNode.get()} : null;
+
+    long allocationRequestId = optAllocationRequestId.or(0L);

Review Comment:
   rather than this magic number, name the constant 
`DEFAULT_ALLOCATION_REQUEST_ID` and then use that to also initialize the 
`allocationRequestIdGenerator`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.Optional;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+
+/**
+ * {@link FsScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
+ */
+public class FsSourceDynamicScalingYarnServiceManager extends 
AbstractDynamicScalingYarnServiceManager {
+  public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
+  public final static String DYNAMIC_SCALING_ERRORS_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";

Review Comment:
   these are fine for now, but I suspect we'll move to basing these directories 
from the `JobStateUtils.getWorkDirRoot`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -419,41 +398,25 @@ private EventSubmitter buildEventSubmitter() {
         .build();
   }
 
-  /**
-   * Request an allocation of containers. If numTargetContainers is larger 
than the max of current and expected number
-   * of containers then additional containers are requested.
-   * <p>
-   * If numTargetContainers is less than the current number of allocated 
containers then release free containers.
-   * Shrinking is relative to the number of currently allocated containers 
since it takes time for containers
-   * to be allocated and assigned work and we want to avoid releasing a 
container prematurely before it is assigned
-   * work. This means that a container may not be released even though 
numTargetContainers is less than the requested
-   * number of containers. The intended usage is for the caller of this method 
to make periodic calls to attempt to
-   * adjust the cluster towards the desired number of containers.
-   *
-   * @param inUseInstances  a set of in use instances
-   * @return whether successfully requested the target number of containers
-   */
-  public synchronized boolean requestTargetNumberOfContainers(int 
numContainers, Set<String> inUseInstances) {
-    int defaultContainerMemoryMbs = 
config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
-    int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. 
CONTAINER_CORES_KEY);
-
-    LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances 
count is {}, container map size is {}",
-        numContainers, inUseInstances.size(), this.containerMap.size());
-
-    requestContainers(numContainers, 
Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores));
-    LOGGER.info("Current tag-container desired count:{}, tag-container 
allocated: {}", numContainers, this.allocatedContainerCountMap);
-    return true;
+  /** unless overridden to actually scale, "initial" containers may be the 
app's *only* containers! */

Review Comment:
   great comment!  consider: "may be" => "will be"



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java:
##########
@@ -590,15 +556,44 @@ protected ByteBuffer getSecurityTokens() throws 
IOException {
 
   @VisibleForTesting
   protected String buildContainerCommand(Container container, String 
helixParticipantId, String helixInstanceTag) {
+    long allocationRequestId = container.getAllocationRequestId();
+    // Using getOrDefault for backward-compatibility with containers that 
don't have allocationRequestId set
+    WorkerProfile workerProfile = 
this.workerProfileByAllocationRequestId.getOrDefault(allocationRequestId,
+        this.defaultWorkerProfile);

Review Comment:
   I see value in recovering, so the app continues onward.  but not having an 
associated worker profile really is a bug!  let's not silently resolve, but 
instead:
   ```
   WorkerProfile workerProfile = 
Optional.ofNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId))
       .orElseGet(() -> {
     log.warn("no WP found for {} ... falling back... ");
     return 
this.workerProfileByAllocationRequestId.get(DEFAULT_ALLOCATION_REQUEST_ID); // 
NOTE: instance member unnecessary!
   });
   ```



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.commons.collections.CollectionUtils;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ *
+ * This is an abstract class that provides the basic functionality for 
managing dynamic scaling. Subclasses should implement
+ * {@link #createScalingDirectiveSource()} to provide a {@link 
ScalingDirectiveSource} that will be used to get scaling directives.
+ *
+ * The actual implemented class needs to be passed as value of config {@link 
org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
+ */
+@Slf4j
+public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
+
+  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  protected final Config config;
+  private final DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+
+  public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
+      this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    } else {
+      String errorMsg = "Failure while getting YarnService Instance from 
GobblinTemporalApplicationMaster::get_yarnService()"
+          + " YarnService {" + 
appMaster.get_yarnService().getClass().getSimpleName() + "} is not an instance 
of DynamicScalingYarnService";

Review Comment:
   always use FQ name, since it's maddeningly frustrating in the 1:100 case of 
finding out a name was overloaded in a package we might not even know exists!





Issue Time Tracking
-------------------

    Worklog Id:     (was: 946925)
    Time Spent: 4h 10m  (was: 4h)

> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
>                 Key: GOBBLIN-2174
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of 
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn 
> Service needs to be integrated with the dynamic scaling to have fully 
> functional dynamic scalable yarn service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to