khandelwal-prateek commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000306389


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.TimeBudget;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
 
 /**
  * Simple config-driven linear recommendation for how many containers to use 
to complete the "remaining work" within a given {@link TimeBudget}, per:
  *
- *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) 
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some 
mean size
- *   b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the 
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` 
(MWU)
- *   c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism 
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- *   d. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish 
within the target number of minutes
+ *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link 
org.apache.gobblin.source.workunit.WorkUnit}s
+ *   b. from the configured 
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
 find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ *   c. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units 
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
  */
 @Slf4j
 public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends 
AbstractRecommendScalingForWorkUnitsImpl {
 
-  public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + 
"heuristic.params.numBytesPerMinute";
-  public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L 
* 1000L * 60L; // 80MB/sec
-
+  /**
+   * Calculates the recommended number of containers for processing the 
remaining work units.
+   * <p>
+   * This method first checks whether dynamic scaling is enabled via the job 
state configuration.
+   * If dynamic scaling is disabled, it returns the initial container count as 
specified in the job state.
+   * When dynamic scaling is enabled, it computes the throughput based on the 
count of constituent work units (WUs)
+   * and the processing rate (bytes per minute per thread). The calculation 
involves:
+   * <ol>
+   *   <li>Computing the total bytes to be processed based on the count and 
mean size of top-level work units.</li>
+   *   <li>Calculating the processing rate per container using the amortized 
bytes per minute rate and the container's work unit capacity.</li>
+   *   <li>Estimating the total container-minutes required to process all MWUs 
and determining the number of containers needed
+   *       to meet the job time budget.</li>
+   *   <li>Computing an alternative container count based on the maximum 
number of work units allowed per container.</li>
+   *   <li>Returning the maximum of the two computed container counts as the 
recommended scaling.</li>
+   * </ol>
+   * </p>
+   *
+   * @param remainingWork the summary of work unit sizes and counts remaining 
for processing
+   * @param sourceClass the name of the class invoking this method
+   * @param jobTimeBudget the time budget allocated for the job execution
+   * @param jobState the current job state that holds configuration properties 
and runtime parameters
+   * @return the recommended number of containers to allocate for processing 
the work units
+   */
   @Override
   protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, 
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
-    // for simplicity, for now, consider only top-level work units (aka. 
`MultiWorkUnit`s - MWUs)
+    if 
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false)) {
+      int initialContainerCount = 
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 "1"));
+      log.info("Dynamic scaling is disabled, returning initial container 
count: " + initialContainerCount);
+      return initialContainerCount;
+    }
+
+    long numWUs = remainingWork.getConstituentWorkUnitsCount();
     long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
     double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
-    int numSimultaneousMWUsPerContainer = 
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for 
top-level (MWUs) - not constituent sub-WUs)
-    long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+    double totalBytes = numMWUs * meanBytesPerMWU;

Review Comment:
   yes, both are the same.. Since we need to determine the count using 
`Math.ceil` method which anyway requires a double value(otherwise it would be 
counted down), just used `totalBytes` directly instead of casting 
`getTotalSize` as double



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to