[ 
https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962221
 ]

ASF GitHub Bot logged work on GOBBLIN-2199:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Mar/25 06:34
            Start Date: 18/Mar/25 06:34
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000306389


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.TimeBudget;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
 
 /**
  * Simple config-driven linear recommendation for how many containers to use 
to complete the "remaining work" within a given {@link TimeBudget}, per:
  *
- *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) 
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some 
mean size
- *   b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the 
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` 
(MWU)
- *   c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism 
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- *   d. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish 
within the target number of minutes
+ *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link 
org.apache.gobblin.source.workunit.WorkUnit}s
+ *   b. from the configured 
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
 find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ *   c. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units 
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
  */
 @Slf4j
 public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends 
AbstractRecommendScalingForWorkUnitsImpl {
 
-  public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + 
"heuristic.params.numBytesPerMinute";
-  public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L 
* 1000L * 60L; // 80MB/sec
-
+  /**
+   * Calculates the recommended number of containers for processing the 
remaining work units.
+   * <p>
+   * This method first checks whether dynamic scaling is enabled via the job 
state configuration.
+   * If dynamic scaling is disabled, it returns the initial container count as 
specified in the job state.
+   * When dynamic scaling is enabled, it computes the throughput based on the 
count of constituent work units (WUs)
+   * and the processing rate (bytes per minute per thread). The calculation 
involves:
+   * <ol>
+   *   <li>Computing the total bytes to be processed based on the count and 
mean size of top-level work units.</li>
+   *   <li>Calculating the processing rate per container using the amortized 
bytes per minute rate and the container's work unit capacity.</li>
+   *   <li>Estimating the total container-minutes required to process all MWUs 
and determining the number of containers needed
+   *       to meet the job time budget.</li>
+   *   <li>Computing an alternative container count based on the maximum 
number of work units allowed per container.</li>
+   *   <li>Returning the maximum of the two computed container counts as the 
recommended scaling.</li>
+   * </ol>
+   * </p>
+   *
+   * @param remainingWork the summary of work unit sizes and counts remaining 
for processing
+   * @param sourceClass the name of the class invoking this method
+   * @param jobTimeBudget the time budget allocated for the job execution
+   * @param jobState the current job state that holds configuration properties 
and runtime parameters
+   * @return the recommended number of containers to allocate for processing 
the work units
+   */
   @Override
   protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, 
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
-    // for simplicity, for now, consider only top-level work units (aka. 
`MultiWorkUnit`s - MWUs)
+    if 
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false)) {
+      int initialContainerCount = 
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 "1"));
+      log.info("Dynamic scaling is disabled, returning initial container 
count: " + initialContainerCount);
+      return initialContainerCount;
+    }
+
+    long numWUs = remainingWork.getConstituentWorkUnitsCount();
     long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
     double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
-    int numSimultaneousMWUsPerContainer = 
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for 
top-level (MWUs) - not constituent sub-WUs)
-    long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+    double totalBytes = numMWUs * meanBytesPerMWU;

Review Comment:
   yes, both are the same.. Since we need to determine the count using 
`Math.ceil` method which anyway requires a double value(otherwise it would be 
counted down), just used `totalBytes` directly instead of casting 
`getTotalSize` as double





Issue Time Tracking
-------------------

    Worklog Id:     (was: 962221)
    Time Spent: 0.5h  (was: 20m)

> Support dynamic container scaling on Temporal workload
> ------------------------------------------------------
>
>                 Key: GOBBLIN-2199
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2199
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Prateek Khandelwal
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently, Gobblin runs static count of container(initial containers at the 
> start of job). We need to support dynamic scaling by computing the 
> recommended number of containers such that large data copy workloads can be 
> processed within some completion time and without running into OOM errors on 
> containers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to