[ 
https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962216&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962216
 ]

ASF GitHub Bot logged work on GOBBLIN-2199:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Mar/25 06:12
            Start Date: 18/Mar/25 06:12
    Worklog Time Spent: 10m 
      Work Description: Blazer-007 commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000257208


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.TimeBudget;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
 
 /**
  * Simple config-driven linear recommendation for how many containers to use 
to complete the "remaining work" within a given {@link TimeBudget}, per:
  *
- *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) 
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some 
mean size
- *   b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the 
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` 
(MWU)
- *   c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism 
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- *   d. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish 
within the target number of minutes
+ *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link 
org.apache.gobblin.source.workunit.WorkUnit}s
+ *   b. from the configured 
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
 find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ *   c. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units 
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
  */
 @Slf4j
 public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends 
AbstractRecommendScalingForWorkUnitsImpl {
 
-  public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + 
"heuristic.params.numBytesPerMinute";
-  public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L 
* 1000L * 60L; // 80MB/sec
-
+  /**
+   * Calculates the recommended number of containers for processing the 
remaining work units.
+   * <p>
+   * This method first checks whether dynamic scaling is enabled via the job 
state configuration.
+   * If dynamic scaling is disabled, it returns the initial container count as 
specified in the job state.
+   * When dynamic scaling is enabled, it computes the throughput based on the 
count of constituent work units (WUs)
+   * and the processing rate (bytes per minute per thread). The calculation 
involves:
+   * <ol>
+   *   <li>Computing the total bytes to be processed based on the count and 
mean size of top-level work units.</li>
+   *   <li>Calculating the processing rate per container using the amortized 
bytes per minute rate and the container's work unit capacity.</li>
+   *   <li>Estimating the total container-minutes required to process all MWUs 
and determining the number of containers needed
+   *       to meet the job time budget.</li>
+   *   <li>Computing an alternative container count based on the maximum 
number of work units allowed per container.</li>
+   *   <li>Returning the maximum of the two computed container counts as the 
recommended scaling.</li>
+   * </ol>
+   * </p>
+   *
+   * @param remainingWork the summary of work unit sizes and counts remaining 
for processing
+   * @param sourceClass the name of the class invoking this method
+   * @param jobTimeBudget the time budget allocated for the job execution
+   * @param jobState the current job state that holds configuration properties 
and runtime parameters
+   * @return the recommended number of containers to allocate for processing 
the work units
+   */
   @Override
   protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, 
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
-    // for simplicity, for now, consider only top-level work units (aka. 
`MultiWorkUnit`s - MWUs)
+    if 
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false)) {
+      int initialContainerCount = 
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 "1"));
+      log.info("Dynamic scaling is disabled, returning initial container 
count: " + initialContainerCount);
+      return initialContainerCount;
+    }
+
+    long numWUs = remainingWork.getConstituentWorkUnitsCount();
     long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
     double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
-    int numSimultaneousMWUsPerContainer = 
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for 
top-level (MWUs) - not constituent sub-WUs)
-    long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+    double totalBytes = numMWUs * meanBytesPerMWU;

Review Comment:
   we can use `remainingWork.getTotalSize` 



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -110,12 +110,13 @@ protected List<WorkUnit> 
loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste
    * NOTE: adapted from {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
    * @return count of how many tasks executed (0 if execution ultimately 
failed, but we *believe* TaskState should already have been recorded beforehand)
    */
-  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws 
IOException, InterruptedException {
+  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs, IssueRepository issueRepository,
+                        Properties jobProperties) throws IOException, 
InterruptedException {
     String containerId = "container-id-for-wu-" + wu.getCorrelator();
     StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
 
     TaskStateTracker taskStateTracker = 
createEssentializedTaskStateTracker(wu);
-    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+    TaskExecutor taskExecutor = new TaskExecutor(jobProperties);

Review Comment:
   Are we passing any property through `jobProperties` that is needed to create 
`taskExecutor` ?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java:
##########
@@ -23,65 +23,128 @@
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.TimeBudget;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
 
 /**
  * Simple config-driven linear recommendation for how many containers to use 
to complete the "remaining work" within a given {@link TimeBudget}, per:
  *
- *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) 
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some 
mean size
- *   b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the 
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` 
(MWU)
- *   c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism 
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- *   d. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish 
within the target number of minutes
+ *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link 
org.apache.gobblin.source.workunit.WorkUnit}s
+ *   b. from the configured 
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
 find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ *   c. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units 
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
  */
 @Slf4j
 public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends 
AbstractRecommendScalingForWorkUnitsImpl {
 
-  public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + 
"heuristic.params.numBytesPerMinute";
-  public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L 
* 1000L * 60L; // 80MB/sec
-
+  /**
+   * Calculates the recommended number of containers for processing the 
remaining work units.
+   * <p>
+   * This method first checks whether dynamic scaling is enabled via the job 
state configuration.
+   * If dynamic scaling is disabled, it returns the initial container count as 
specified in the job state.
+   * When dynamic scaling is enabled, it computes the throughput based on the 
count of constituent work units (WUs)
+   * and the processing rate (bytes per minute per thread). The calculation 
involves:
+   * <ol>
+   *   <li>Computing the total bytes to be processed based on the count and 
mean size of top-level work units.</li>
+   *   <li>Calculating the processing rate per container using the amortized 
bytes per minute rate and the container's work unit capacity.</li>
+   *   <li>Estimating the total container-minutes required to process all MWUs 
and determining the number of containers needed
+   *       to meet the job time budget.</li>
+   *   <li>Computing an alternative container count based on the maximum 
number of work units allowed per container.</li>
+   *   <li>Returning the maximum of the two computed container counts as the 
recommended scaling.</li>
+   * </ol>
+   * </p>
+   *
+   * @param remainingWork the summary of work unit sizes and counts remaining 
for processing
+   * @param sourceClass the name of the class invoking this method
+   * @param jobTimeBudget the time budget allocated for the job execution
+   * @param jobState the current job state that holds configuration properties 
and runtime parameters
+   * @return the recommended number of containers to allocate for processing 
the work units
+   */
   @Override
   protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, 
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
-    // for simplicity, for now, consider only top-level work units (aka. 
`MultiWorkUnit`s - MWUs)
+    if 
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false)) {
+      int initialContainerCount = 
Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 "1"));
+      log.info("Dynamic scaling is disabled, returning initial container 
count: " + initialContainerCount);
+      return initialContainerCount;
+    }
+
+    long numWUs = remainingWork.getConstituentWorkUnitsCount();
     long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
     double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
-    int numSimultaneousMWUsPerContainer = 
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for 
top-level (MWUs) - not constituent sub-WUs)
-    long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+    double totalBytes = numMWUs * meanBytesPerMWU;
+    long bytesPerMinuteProcRatePerThread = 
calcAmortizedBytesPerMinute(jobState);
     log.info("Calculating auto-scaling (for {} remaining work units within {}) 
using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
-        numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);
+        numMWUs, jobTimeBudget, bytesPerMinuteProcRatePerThread, 
meanBytesPerMWU);

Review Comment:
   log line `bytesPerMinuteProcRate` --> `bytesPerMinuteProcRatePerThread`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -166,17 +166,19 @@ protected ProcessWorkUnitsWorkflow 
createProcessWorkUnitsWorkflow(Properties job
   protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, 
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
     // TODO: make fully configurable!  for now, cap Work Discovery at 45 mins 
and set aside 10 mins for the `CommitStepWorkflow`
     long maxGenWUsMins = 45;
-    long commitStepMins = 10;
+    long commitStepMins = 15;
     long totalTargetTimeMins = 
TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps,
         ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
         ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
     double permittedOveragePercentage = .2;
     Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
-    long remainingMins = totalTargetTimeMins - 
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
+

Review Comment:
   remove or comment out this line as well ?





Issue Time Tracking
-------------------

            Worklog Id:     (was: 962216)
    Remaining Estimate: 0h
            Time Spent: 10m

> Support dynamic container scaling on Temporal workload
> ------------------------------------------------------
>
>                 Key: GOBBLIN-2199
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2199
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Prateek Khandelwal
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently, Gobblin runs static count of container(initial containers at the 
> start of job). We need to support dynamic scaling by computing the 
> recommended number of containers such that large data copy workloads can be 
> processed within some completion time and without running into OOM errors on 
> containers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to