This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a978b299ba [GOBBLIN-2199] Support dynamic container scaling based on 
target completion time & WU count (#4106)
a978b299ba is described below

commit a978b299ba1f2762786f5fecee5a960269db219b
Author: Prateek Khandelwal <[email protected]>
AuthorDate: Tue Mar 18 13:02:32 2025 +0530

    [GOBBLIN-2199] Support dynamic container scaling based on target completion 
time & WU count (#4106)
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +-
 .../temporal/GobblinTemporalConfigurationKeys.java |  14 ++
 .../ddm/activity/impl/ProcessWorkUnitImpl.java     |   7 +-
 ...mendScalingForWorkUnitsLinearHeuristicImpl.java | 125 ++++++++++----
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |  12 +-
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |  16 +-
 ...ScalingForWorkUnitsLinearHeuristicImplTest.java | 182 +++++++++++++++++++--
 7 files changed, 296 insertions(+), 62 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d9243f6e0d..6e07afac1c 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -210,7 +210,7 @@ public class ConfigurationKeys {
   public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
   public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
   public static final String JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY = 
"job.duration.target.completion.in.minutes";
-  public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES = 
360;
+  public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES = 
120;
 
   public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = 
"job.commit.partial.fail.task.fails.job.commit";
   // If true, commit of different datasets will be performed in parallel
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 1de75b30d5..cdd5728fdc 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -65,6 +65,17 @@ public interface GobblinTemporalConfigurationKeys {
    */
   String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX + 
"num.workers.per.container";
   int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
+  String TEMPORAL_NUM_THREADS_PER_WORKER = PREFIX + "num.threads.per.worker";
+  int DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER = 15;
+
+  // Configuration key for setting the amortized throughput per worker thread 
per minute
+  String TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE = PREFIX + 
"worker.thread.amortized.throughput.per.minute";
+  long DEFAULT_TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE = 500 * 
1000 * 1000L; // 500MB/min
+
+  // Configuration key for setting the maximum number of work units allowed 
per container
+  String TEMPORAL_WORKUNITS_PER_CONTAINER = PREFIX + "workunits.per.container";
+  int DEFAULT_MAX_WORKUNITS_PER_CONTAINER = 2000;
+
   String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string";
 
   /**
@@ -72,6 +83,9 @@ public interface GobblinTemporalConfigurationKeys {
    */
   String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
 
+  // Configuration key to enable/disable dynamic scaling
+  String DYNAMIC_SCALING_ENABLED = DYNAMIC_SCALING_PREFIX + "enabled";
+
   String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + 
"polling.interval.seconds";
   int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index 9450bc7bcb..ffd84d0bc1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -90,7 +90,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
           heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
       troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
       troubleshooter.start();
-      return execute(workUnits, wu, jobState, fs, 
troubleshooter.getIssueRepository());
+      return execute(workUnits, wu, jobState, fs, 
troubleshooter.getIssueRepository(), jobState.getProperties());
     } catch (IOException | InterruptedException e) {
       throw new RuntimeException(e);
     } finally {
@@ -110,12 +110,13 @@ public class ProcessWorkUnitImpl implements 
ProcessWorkUnit {
    * NOTE: adapted from {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
    * @return count of how many tasks executed (0 if execution ultimately 
failed, but we *believe* TaskState should already have been recorded beforehand)
    */
-  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws 
IOException, InterruptedException {
+  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs, IssueRepository issueRepository,
+                        Properties jobProperties) throws IOException, 
InterruptedException {
     String containerId = "container-id-for-wu-" + wu.getCorrelator();
     StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
 
     TaskStateTracker taskStateTracker = 
createEssentializedTaskStateTracker(wu);
-    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+    TaskExecutor taskExecutor = new TaskExecutor(jobProperties);
     GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy = 
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
 
     SharedResourcesBroker<GobblinScopeTypes> resourcesBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
index 906ebe2426..777f7e9c20 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
@@ -23,65 +23,128 @@ import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.ddm.work.TimeBudget;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
 
 /**
  * Simple config-driven linear recommendation for how many containers to use 
to complete the "remaining work" within a given {@link TimeBudget}, per:
  *
- *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) 
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some 
mean size
- *   b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the 
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` 
(MWU)
- *   c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism 
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- *   d. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish 
within the target number of minutes
+ *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link 
org.apache.gobblin.source.workunit.WorkUnit}s
+ *   b. from the configured 
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
 find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ *   c. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units 
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
  */
 @Slf4j
 public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends 
AbstractRecommendScalingForWorkUnitsImpl {
 
-  public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + 
"heuristic.params.numBytesPerMinute";
-  public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L 
* 1000L * 60L; // 80MB/sec
-
+  /**
+   * Calculates the recommended number of containers for processing the 
remaining work units.
+   * <p>
+   * This method first checks whether dynamic scaling is enabled via the job 
state configuration.
+   * If dynamic scaling is disabled, it returns the initial container count as 
specified in the job state.
+   * When dynamic scaling is enabled, it computes the throughput based on the 
count of constituent work units (WUs)
+   * and the processing rate (bytes per minute per thread). The calculation 
involves:
+   * <ol>
+   *   <li>Computing the total bytes to be processed based on the count and 
mean size of top-level work units.</li>
+   *   <li>Calculating the processing rate per container using the amortized 
bytes per minute rate and the container's work unit capacity.</li>
+   *   <li>Estimating the total container-minutes required to process all MWUs 
and determining the number of containers needed
+   *       to meet the job time budget.</li>
+   *   <li>Computing an alternative container count based on the maximum 
number of work units allowed per container.</li>
+   *   <li>Returning the maximum of the two computed container counts as the 
recommended scaling.</li>
+   * </ol>
+   * </p>
+   *
+   * @param remainingWork the summary of work unit sizes and counts remaining 
for processing
+   * @param sourceClass the name of the class invoking this method
+   * @param jobTimeBudget the time budget allocated for the job execution
+   * @param jobState the current job state that holds configuration properties 
and runtime parameters
+   * @return the recommended number of containers to allocate for processing 
the work units
+   */
   @Override
   protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, 
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
-    // for simplicity, for now, consider only top-level work units (aka. 
`MultiWorkUnit`s - MWUs)
+    if 
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false)) {
+      int initialContainerCount = 
Integer.parseInt(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 "1"));
+      log.info("Dynamic scaling is disabled, returning initial container 
count: " + initialContainerCount);
+      return initialContainerCount;
+    }
+
+    long numWUs = remainingWork.getConstituentWorkUnitsCount();
     long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
     double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
-    int numSimultaneousMWUsPerContainer = 
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for 
top-level (MWUs) - not constituent sub-WUs)
-    long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
-    log.info("Calculating auto-scaling (for {} remaining work units within {}) 
using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
-        numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);
+    double totalBytes = numMWUs * meanBytesPerMWU;
+    long bytesPerMinuteProcRatePerThread = 
calcAmortizedBytesPerMinute(jobState);
+    log.info("Calculating auto-scaling (for {} remaining work units within {}) 
using: bytesPerMinuteProcRatePerThread = {}; meanBytesPerMWU = {}",
+        numMWUs, jobTimeBudget, bytesPerMinuteProcRatePerThread, 
meanBytesPerMWU);
 
     // calc how many container*minutes to process all MWUs, based on mean MWU 
size
-    double minutesProcTimeForMeanMWU = meanBytesPerMWU / 
bytesPerMinuteProcRate;
-    double meanMWUsThroughputPerContainerMinute = 
numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU;
-    double estContainerMinutesForAllMWUs = numMWUs / 
meanMWUsThroughputPerContainerMinute;
+    int numSimultaneousMWUsPerContainer = 
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for 
top-level (MWUs) - not constituent sub-WUs)
+    double containerProcRate = bytesPerMinuteProcRatePerThread * 
numSimultaneousMWUsPerContainer;
+    double estContainerMinutesForAllMWUs = totalBytes/containerProcRate;
+    log.info("Container byte processing throughput: {}, totalBytes: {}, est. 
containerMinutes to complete all MWUs: {}",
+        containerProcRate, totalBytes, estContainerMinutesForAllMWUs);
 
+    // Determine the required number of containers based on the job's time 
budget
     long targetNumMinutesForAllMWUs = 
jobTimeBudget.getMaxTargetDurationMinutes();
-    // TODO: take into account `jobTimeBudget.getPermittedOverageMinutes()` - 
e.g. to decide whether to use `Math.ceil` vs. `Math.floor`
+    int numContainerForThroughout = (int) 
Math.ceil(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs);
+
+    // Determine the required number of containers based on work unit count 
limits
+    int maxWUsPerContainer = calcMaxWUPerContainer(jobState);
+    int numContainerForWUs = (int) Math.ceil((numWUs * 1.0) / 
maxWUsPerContainer);
 
-    // TODO: decide how to account for container startup; working est. for 
GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m)
-    //   e.g. can we amortize away / ignore when `targetNumMinutesForAllMWUs 
>> workerRequestToReadyNumMinutes`?
-    // TODO take into account that MWUs are quantized into discrete chunks; 
this est. uses avg and presumes to divide partial MWUs amongst workers
-    //   can we we mostly ignore if we keep MWU "chunk size" "small-ish", like 
maybe even just `duration(max(MWU)) <= targetNumMinutesForAllMWUs/2)`?
+    int recommendedNumContainers = Math.max(numContainerForWUs, 
numContainerForThroughout);
+    log.info("Recommended auto-scaling: {} containers, no. of containers 
considering throughput: {}, no. of containers considering WUs: {}",
+        recommendedNumContainers, numContainerForThroughout, 
numContainerForWUs);
 
-    int recommendedNumContainers = (int) 
Math.floor(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs);
-    log.info("Recommended auto-scaling: {} containers, given: 
minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); "
-        + "est. container*minutes to complete ALL ({}) MWUs = {}",
-        recommendedNumContainers, minutesProcTimeForMeanMWU, 
meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs);
     return recommendedNumContainers;
   }
 
+  /**
+   * Calculates the work unit processing capacity for a single container.
+   * <p>
+   * This is determined by multiplying the number of workers assigned to a 
container by the number of threads available per worker.
+   * </p>
+   *
+   * @param jobState the current job state containing configuration properties
+   * @return the number of top-level work units (MWUs) that can be processed 
concurrently by one container
+   */
   protected int calcPerContainerWUCapacity(JobState jobState) {
     int numWorkersPerContainer = 
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
         
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
-    int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY; 
// TODO: get from config, once that's implemented
+    int numThreadsPerWorker = 
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER);
     return numWorkersPerContainer * numThreadsPerWorker;
   }
 
+
+  /**
+   * Retrieves the amortized throughput rate in bytes per minute per thread 
from the job configuration.
+   * <p>
+   * This value represents the expected processing rate for a worker thread 
and is used to compute container's processing rate.
+   * </p>
+   *
+   * @param jobState the current job state containing configuration properties
+   * @return the amortized processing rate in bytes per minute per thread
+   */
   protected long calcAmortizedBytesPerMinute(JobState jobState) {
-    return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE, 
DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE);
+    return 
jobState.getPropAsLong(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE);
+  }
+
+  /**
+   * Determines the maximum number of work units that can be assigned to a 
single container.
+   * <p>
+   * This limit is obtained from the job configuration and is used as an 
alternative constraint when scaling containers.
+   * </p>
+   *
+   * @param jobState the current job state containing configuration properties
+   * @return the maximum allowed number of work units per container
+   */
+  protected int calcMaxWUPerContainer(JobState jobState) {
+    return 
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER,
+        GobblinTemporalConfigurationKeys.DEFAULT_MAX_WORKUNITS_PER_CONTAINER);
   }
+
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 27348858e7..12fe6c4d84 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -24,6 +24,7 @@ import com.typesafe.config.Config;
 import io.temporal.client.WorkflowClient;
 import io.temporal.worker.WorkerOptions;
 
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
 import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
 import 
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
@@ -36,15 +37,18 @@ import 
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowIm
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
 import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
 public class WorkFulfillmentWorker extends AbstractTemporalWorker {
     public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; // 
TODO: make configurable!
-    public static final int MAX_EXECUTION_CONCURRENCY = 5; // TODO: make 
configurable!
+    public int maxExecutionConcurrency;
 
     public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) 
{
         super(config, workflowClient);
+        this.maxExecutionConcurrency = ConfigUtils.getInt(config, 
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+            
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER);
     }
 
     @Override
@@ -64,9 +68,9 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
         return WorkerOptions.newBuilder()
             // default is only 1s - WAY TOO SHORT for 
`o.a.hadoop.fs.FileSystem#listStatus`!
             
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
-            .setMaxConcurrentActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
-            
.setMaxConcurrentLocalActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
-            
.setMaxConcurrentWorkflowTaskExecutionSize(MAX_EXECUTION_CONCURRENCY)
+            
.setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency)
+            
.setMaxConcurrentLocalActivityExecutionSize(this.maxExecutionConcurrency)
+            
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxExecutionConcurrency)
             .build();
     }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 55f50fab22..198ab8a6d5 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
 
 import java.io.IOException;
 import java.net.URI;
-import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -110,7 +109,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
         log.info("Recommended scaling to process WUs within {}: {}", 
timeBudget, scalingDirectives);
         try {
           ScalingDirectivesRecipient recipient = 
createScalingDirectivesRecipient(jobProps, closer);
-          List<ScalingDirective> adjustedScalingDirectives = 
adjustRecommendedScaling(scalingDirectives);
+          List<ScalingDirective> adjustedScalingDirectives = 
adjustRecommendedScaling(jobProps, scalingDirectives);
           log.info("Submitting (adjusted) scaling directives: {}", 
adjustedScalingDirectives);
           recipient.receive(adjustedScalingDirectives);
           // TODO: when eliminating the "GenWUs Worker", pause/block until 
scaling is complete
@@ -166,17 +165,19 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
   protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, 
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
     // TODO: make fully configurable!  for now, cap Work Discovery at 45 mins 
and set aside 10 mins for the `CommitStepWorkflow`
     long maxGenWUsMins = 45;
-    long commitStepMins = 10;
+    long commitStepMins = 15;
     long totalTargetTimeMins = 
TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps,
         ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
         ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
     double permittedOveragePercentage = .2;
-    Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
-    long remainingMins = totalTargetTimeMins - 
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
+
+    // since actual generate WU duration can vary significantly across jobs, 
removing that from computation to enable deterministic duration for WU 
processing
+    // Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
+    long remainingMins = totalTargetTimeMins - maxGenWUsMins - commitStepMins;
     return TimeBudget.withOveragePercentage(remainingMins, 
permittedOveragePercentage);
   }
 
-  protected List<ScalingDirective> 
adjustRecommendedScaling(List<ScalingDirective> recommendedScalingDirectives) {
+  protected List<ScalingDirective> adjustRecommendedScaling(Properties 
jobProps, List<ScalingDirective> recommendedScalingDirectives) {
     // TODO: make any adjustments - e.g. decide whether to shutdown the (often 
oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it
     if (recommendedScalingDirectives.size() == 0) {
       return recommendedScalingDirectives;
@@ -185,7 +186,8 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     ArrayList<ScalingDirective> adjustedScaling = new 
ArrayList<>(recommendedScalingDirectives);
     ScalingDirective firstDirective = adjustedScaling.get(0);
     // deduct one for (already existing) `GenerateWorkUnits` worker (we 
presume its "baseline" `WorkerProfile` similar enough to substitute for this 
new one)
-    adjustedScaling.set(0, 
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1));
+    int initialContainerCount = 
Integer.parseInt(jobProps.getProperty(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 "1"));
+    adjustedScaling.set(0, 
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 
initialContainerCount));
     // CAUTION: filter out set point zero, which (depending upon 
`.getProfileName()`) *could* down-scale away our only current worker
     // TODO: consider whether to allow either a) "pre-defining" a profile w/ 
set point zero, available for later use OR b) down-scaling to zero to pause 
worker
     return adjustedScaling.stream().filter(sd -> sd.getSetPoint() > 
0).collect(Collectors.toList());
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
index f7470f1928..5cf83803e4 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.temporal.ddm.activity.impl;
 
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -45,34 +46,183 @@ public class 
RecommendScalingForWorkUnitsLinearHeuristicImplTest {
   }
 
   @Test
-  public void testCalcDerivationSetPoint() {
-    
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER),
 Mockito.anyInt()))
-        .thenReturn(4); // 4 workers per container
-    
Mockito.when(jobState.getPropAsLong(Mockito.eq(RecommendScalingForWorkUnitsLinearHeuristicImpl.AMORTIZED_NUM_BYTES_PER_MINUTE),
 Mockito.anyLong()))
-        .thenReturn(100L * 1000 * 1000); // 100MB/minute
+  void testScalingDisabled() {
+    
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false)).thenReturn(false);
+    
Mockito.when(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
 "1")).thenReturn("2");
+
+    long totalNumMWUs = 3000L;
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs);
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
 // 500MB
+    int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, 
"someSourceClass", timeBudget, jobState);
+
+    Assert.assertEquals(2, result, "Should return initial container count if 
dynamic scaling is disabled");
+  }
+
+  @Test
+  public void testCalcDerivationSetPointForNoWorkUnits() {
+    
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false))
+        .thenReturn(true);
+    stubContainerCapacity(1,5);
+
+    Mockito.when(jobState.getPropAsLong(
+            
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
 Mockito.anyLong()))
+        .thenReturn(100L * 1000 * 1000);
+    
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER),
 Mockito.anyInt()))
+        .thenReturn(200);
+    Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(60L);
+
+    // Set both top-level and constituent work units to zero.
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(0L);
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
+    
Mockito.when(workUnitsSizeSummary.getConstituentWorkUnitsCount()).thenReturn(0L);
+
+    int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, 
"source", timeBudget, jobState);
+    Assert.assertEquals(result, 0, "Expected 0 containers when there are no 
work units");
+  }
+
+  @Test
+  public void testCalcDerivationSetPointWithThroughputConstraintDominating() {
+    
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false))
+        .thenReturn(true);
+
+    stubContainerCapacity(4,5);
+
+    Mockito.when(jobState.getPropAsLong(
+            
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
+            Mockito.anyLong()))
+        .thenReturn(50L * 1000 * 1000); // 50MB/minute
+
+    
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER),
 Mockito.anyInt()))
+        .thenReturn(5000);
+    Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(60L);
+
+    // Configure work units: throughput constraint will dominate.
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(1000L);
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
+    
Mockito.when(workUnitsSizeSummary.getConstituentWorkUnitsCount()).thenReturn(1500L);
+
+    // Throughput calculation:
+    // totalBytes = 1000 * 500e6 = 500e9 bytes.
+    // containerProcRate = 50e6 * 20 = 1e9 bytes/min.
+    // estimated container minutes = 500e9 / 1e9 = 500 minutes.
+    // Throughput containers = ceil(500 / 60) = 9.
+    // Work unit constraint = ceil(1500 / 5000) = 1.
+    // Expected recommended containers = max(9, 1) = 9.
+    int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, 
"source", timeBudget, jobState);
+    Assert.assertEquals(result, 9, "Expected throughput constraint to dominate 
and yield 9 containers");
+  }
+
+  @Test
+  public void testCalcDerivationSetPointWithWorkUnitConstraintDominating2() {
+    // Enable dynamic scaling.
+    
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false))
+        .thenReturn(true);
+
+    stubContainerCapacity(4,5);
+
+    Mockito.when(jobState.getPropAsLong(
+            
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
+            Mockito.anyLong()))
+        .thenReturn(100L * 1000 * 1000);
+
+    
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER),
 Mockito.anyInt()))
+        .thenReturn(10);
+
+    Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(120L);
+
+    // Configure work units:
+    // For throughput: top-level MWUs count and mean size.
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(1000L);
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
  // 500MB per MWU
+    // For work unit constraint: total constituent work units.
+    
Mockito.when(workUnitsSizeSummary.getConstituentWorkUnitsCount()).thenReturn(5000L);
+
+    // Throughput calculation:
+    // totalBytes = 1000 * 500e6 = 5.0E11 bytes.
+    // Container capacity = 4 workers * 5 threads = 20.
+    // Container throughput = 100e6 * 20 = 2.0E9 bytes/minute.
+    // Estimated container minutes = 5.0E11 / 2.0E9 = 250 minutes.
+    // Throughput constraint = ceil(250 / 120) = 3 containers.
+
+    // Work unit constraint = ceil(5000 / 10) = 500 containers.
+    // The recommended set point should be max(3, 500) = 500 containers.
+    int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, 
"source", timeBudget, jobState);
+    Assert.assertEquals(result, 500, "Expected work unit constraint to 
dominate and yield 500 containers");
+  }
+
+  @Test
+  public void testCalcDerivationSetPointWithChangingTimeBudget() {
+    
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
 false)).thenReturn(true);
+    stubContainerCapacity(4,5);
+
+    // Set throughput using the correct key.
+    Mockito.when(jobState.getPropAsLong(
+            
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
+            Mockito.anyLong()))
+        .thenReturn(100L * 1000 * 1000); // 100 MB/minute per thread
+
+    // Set the work unit constraint so that throughput dominates.
+    Mockito.when(jobState.getPropAsInt(
+            
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER), 
Mockito.anyInt()))
+        .thenReturn(50);
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(50L);
+
+
+    // Set the target job duration.
     long targetTimeBudgetMinutes = 75L;
     
Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(targetTimeBudgetMinutes);
 
+    // Configure work units: 3000 top-level work units with an average size of 
500 MB.
     long totalNumMWUs = 3000L;
     
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs);
-    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
 // 500MB
-    // parallelization capacity = 20 container-slots (= 4 * 5)
-    // per-container-slot rate = 5 container-slot-mins/mean(MWU) (= 500 
MB/mean(MWU) / 100MB/min)
-    long numMWUsPerMinutePerContainer = 4; // (amortized) per-container rate = 
4 MWU/container-minute (= 20 / 5)
-    long totalNumContainerMinutesAllMWUs = totalNumMWUs / 
numMWUsPerMinutePerContainer; // 750 container-minutes (= 3000 MWU / 4 
MWU/container-min)
-    long expectedSetPoint = totalNumContainerMinutesAllMWUs / 
targetTimeBudgetMinutes; // 10 containers (= 750 / 75)
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
 // 500 MB
+
+
+    // Calculate expected throughput constraint:
+    //   totalBytes = 3000 * 500e6 = 1.5e12 bytes.
+    //   Container capacity = 4 * 5 = 20.
+    //   Container throughput = 100e6 * 20 = 2e9 bytes/min.
+    //   Estimated container minutes = 1.5e12 / 2e9 = 750 minutes.
+    //   Throughput constraint = ceil(750 / 75) = 10 containers.
+    int expectedThroughputContainers = 10;
 
     int resultA = 
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", 
timeBudget, jobState);
-    Assert.assertEquals(resultA, expectedSetPoint);
+    Assert.assertEquals(resultA, expectedThroughputContainers,
+        "Expected recommended containers based on throughput constraint");
 
-    // verify: 3x MWUs ==> 3x the recommended set point
+    // Verify: Tripling the top-level work units should triple the recommended 
set point.
     
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs
 * 3);
     int tripledResult = 
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", 
timeBudget, jobState);
-    Assert.assertEquals(tripledResult, resultA * 3);
+    Assert.assertEquals(tripledResult, resultA * 3,
+        "Tripling the number of top-level work units should triple the set 
point");
 
-    // reduce the target duration by a third, and verify: 3/2 the recommended 
set point
+    // Test reduced time budget: for the tripled workload, reduce the target 
duration.
+    // New target duration = 2 * (75 / 3) = 50 minutes.
     Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(2 * 
(targetTimeBudgetMinutes / 3));
+    // Recalculate throughput constraint:
+    //   totalBytes = 9000 * 500e6 = 4.5e12 bytes.
+    //   Container throughput remains 2e9 bytes/min.
+    //   Estimated container minutes = 4.5e12 / 2e9 = 2250 minutes.
+    //   Throughput constraint = ceil(2250 / 50) = 45 containers.
+    int expectedReducedContainers = 45;
     int reducedTimeBudgetResult = 
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", 
timeBudget, jobState);
-    Assert.assertEquals(reducedTimeBudgetResult, (long) 
Math.round(expectedSetPoint * 3 * (3.0 / 2.0)));
+    Assert.assertEquals(reducedTimeBudgetResult, expectedReducedContainers,
+        "Reducing the target duration should increase the recommended set 
point proportionally");
   }
+
+  /**
+   * Stubs the container capacity properties in the {@link JobState} mock.
+   *
+   * @param numWorkers the number of workers per container to stub in the 
{@link JobState} mock.
+   * @param threadsPerWorker the number of threads per worker to stub in the 
{@link JobState} mock.
+   */
+  private void stubContainerCapacity(int numWorkers, int threadsPerWorker) {
+    Mockito.when(jobState.getPropAsInt(
+            
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER),
 Mockito.anyInt()))
+        .thenReturn(numWorkers);
+    Mockito.when(jobState.getPropAsInt(
+            
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER), 
Mockito.anyInt()))
+        .thenReturn(threadsPerWorker);
+  }
+
 }


Reply via email to