This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a978b299ba [GOBBLIN-2199] Support dynamic container scaling based on
target completion time & WU count (#4106)
a978b299ba is described below
commit a978b299ba1f2762786f5fecee5a960269db219b
Author: Prateek Khandelwal <[email protected]>
AuthorDate: Tue Mar 18 13:02:32 2025 +0530
[GOBBLIN-2199] Support dynamic container scaling based on target completion
time & WU count (#4106)
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +-
.../temporal/GobblinTemporalConfigurationKeys.java | 14 ++
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 7 +-
...mendScalingForWorkUnitsLinearHeuristicImpl.java | 125 ++++++++++----
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 12 +-
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 16 +-
...ScalingForWorkUnitsLinearHeuristicImplTest.java | 182 +++++++++++++++++++--
7 files changed, 296 insertions(+), 62 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d9243f6e0d..6e07afac1c 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -210,7 +210,7 @@ public class ConfigurationKeys {
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
public static final String JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY =
"job.duration.target.completion.in.minutes";
- public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES =
360;
+ public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES =
120;
public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT =
"job.commit.partial.fail.task.fails.job.commit";
// If true, commit of different datasets will be performed in parallel
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 1de75b30d5..cdd5728fdc 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -65,6 +65,17 @@ public interface GobblinTemporalConfigurationKeys {
*/
String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX +
"num.workers.per.container";
int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
+ String TEMPORAL_NUM_THREADS_PER_WORKER = PREFIX + "num.threads.per.worker";
+ int DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER = 15;
+
+ // Configuration key for setting the amortized throughput per worker thread
per minute
+ String TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE = PREFIX +
"worker.thread.amortized.throughput.per.minute";
+ long DEFAULT_TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE = 500 *
1000 * 1000L; // 500MB/min
+
+ // Configuration key for setting the maximum number of work units allowed
per container
+ String TEMPORAL_WORKUNITS_PER_CONTAINER = PREFIX + "workunits.per.container";
+ int DEFAULT_MAX_WORKUNITS_PER_CONTAINER = 2000;
+
String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string";
/**
@@ -72,6 +83,9 @@ public interface GobblinTemporalConfigurationKeys {
*/
String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
+ // Configuration key to enable/disable dynamic scaling
+ String DYNAMIC_SCALING_ENABLED = DYNAMIC_SCALING_PREFIX + "enabled";
+
String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX +
"polling.interval.seconds";
int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index 9450bc7bcb..ffd84d0bc1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -90,7 +90,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
heartBeatInterval, heartBeatInterval, TimeUnit.MINUTES);
troubleshooter =
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
- return execute(workUnits, wu, jobState, fs,
troubleshooter.getIssueRepository());
+ return execute(workUnits, wu, jobState, fs,
troubleshooter.getIssueRepository(), jobState.getProperties());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
@@ -110,12 +110,13 @@ public class ProcessWorkUnitImpl implements
ProcessWorkUnit {
* NOTE: adapted from {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
* @return count of how many tasks executed (0 if execution ultimately
failed, but we *believe* TaskState should already have been recorded beforehand)
*/
- protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws
IOException, InterruptedException {
+ protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu,
JobState jobState, FileSystem fs, IssueRepository issueRepository,
+ Properties jobProperties) throws IOException,
InterruptedException {
String containerId = "container-id-for-wu-" + wu.getCorrelator();
StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
TaskStateTracker taskStateTracker =
createEssentializedTaskStateTracker(wu);
- TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+ TaskExecutor taskExecutor = new TaskExecutor(jobProperties);
GobblinMultiTaskAttempt.CommitPolicy multiTaskAttemptCommitPolicy =
GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE; // as no speculative exec
SharedResourcesBroker<GobblinScopeTypes> resourcesBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
index 906ebe2426..777f7e9c20 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
@@ -23,65 +23,128 @@ import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
-import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
/**
* Simple config-driven linear recommendation for how many containers to use
to complete the "remaining work" within a given {@link TimeBudget}, per:
*
- * a. from {@link WorkUnitsSizeSummary}, find how many (remaining)
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some
mean size
- * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the
expected "processing rate" in bytes / minute
- * 1. estimate the time required for processing a mean-sized `MultiWorkUnit`
(MWU)
- * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism
capacity (aka. "worker-slots") to base the recommendation upon
- * 2. calculate the per-container throughput of MWUs per minute
- * 3. estimate the total per-container-minutes required to process all MWUs
- * d. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
- * 4. recommend the number of containers so all MWU processing should finish
within the target number of minutes
+ * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link
org.apache.gobblin.source.workunit.WorkUnit}s
+ * b. from the configured
GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
find the expected "processing rate" in bytes / minute
+ * 1. estimate the total container-minutes required to process all MWUs
+ * c. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
+ * 2. estimate container count based on target minutes
+ * 2. estimate container count based on the maximum number of work units
allowed per container
+ * 4. recommend the number of containers as max of above two container counts
*/
@Slf4j
public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends
AbstractRecommendScalingForWorkUnitsImpl {
- public static final String AMORTIZED_NUM_BYTES_PER_MINUTE =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX +
"heuristic.params.numBytesPerMinute";
- public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L
* 1000L * 60L; // 80MB/sec
-
+ /**
+ * Calculates the recommended number of containers for processing the
remaining work units.
+ * <p>
+ * This method first checks whether dynamic scaling is enabled via the job
state configuration.
+ * If dynamic scaling is disabled, it returns the initial container count as
specified in the job state.
+ * When dynamic scaling is enabled, it computes the throughput based on the
count of constituent work units (WUs)
+ * and the processing rate (bytes per minute per thread). The calculation
involves:
+ * <ol>
+ * <li>Computing the total bytes to be processed based on the count and
mean size of top-level work units.</li>
+ * <li>Calculating the processing rate per container using the amortized
bytes per minute rate and the container's work unit capacity.</li>
+ * <li>Estimating the total container-minutes required to process all MWUs
and determining the number of containers needed
+ * to meet the job time budget.</li>
+ * <li>Computing an alternative container count based on the maximum
number of work units allowed per container.</li>
+ * <li>Returning the maximum of the two computed container counts as the
recommended scaling.</li>
+ * </ol>
+ * </p>
+ *
+ * @param remainingWork the summary of work unit sizes and counts remaining
for processing
+ * @param sourceClass the name of the class invoking this method
+ * @param jobTimeBudget the time budget allocated for the job execution
+ * @param jobState the current job state that holds configuration properties
and runtime parameters
+ * @return the recommended number of containers to allocate for processing
the work units
+ */
@Override
protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork,
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
- // for simplicity, for now, consider only top-level work units (aka.
`MultiWorkUnit`s - MWUs)
+ if
(!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false)) {
+ int initialContainerCount =
Integer.parseInt(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
"1"));
+ log.info("Dynamic scaling is disabled, returning initial container
count: " + initialContainerCount);
+ return initialContainerCount;
+ }
+
+ long numWUs = remainingWork.getConstituentWorkUnitsCount();
long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
- int numSimultaneousMWUsPerContainer =
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for
top-level (MWUs) - not constituent sub-WUs)
- long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
- log.info("Calculating auto-scaling (for {} remaining work units within {})
using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
- numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);
+ double totalBytes = numMWUs * meanBytesPerMWU;
+ long bytesPerMinuteProcRatePerThread =
calcAmortizedBytesPerMinute(jobState);
+ log.info("Calculating auto-scaling (for {} remaining work units within {})
using: bytesPerMinuteProcRatePerThread = {}; meanBytesPerMWU = {}",
+ numMWUs, jobTimeBudget, bytesPerMinuteProcRatePerThread,
meanBytesPerMWU);
// calc how many container*minutes to process all MWUs, based on mean MWU
size
- double minutesProcTimeForMeanMWU = meanBytesPerMWU /
bytesPerMinuteProcRate;
- double meanMWUsThroughputPerContainerMinute =
numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU;
- double estContainerMinutesForAllMWUs = numMWUs /
meanMWUsThroughputPerContainerMinute;
+ int numSimultaneousMWUsPerContainer =
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for
top-level (MWUs) - not constituent sub-WUs)
+ double containerProcRate = bytesPerMinuteProcRatePerThread *
numSimultaneousMWUsPerContainer;
+ double estContainerMinutesForAllMWUs = totalBytes/containerProcRate;
+ log.info("Container byte processing throughput: {}, totalBytes: {}, est.
containerMinutes to complete all MWUs: {}",
+ containerProcRate, totalBytes, estContainerMinutesForAllMWUs);
+ // Determine the required number of containers based on the job's time
budget
long targetNumMinutesForAllMWUs =
jobTimeBudget.getMaxTargetDurationMinutes();
- // TODO: take into account `jobTimeBudget.getPermittedOverageMinutes()` -
e.g. to decide whether to use `Math.ceil` vs. `Math.floor`
+ int numContainerForThroughout = (int)
Math.ceil(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs);
+
+ // Determine the required number of containers based on work unit count
limits
+ int maxWUsPerContainer = calcMaxWUPerContainer(jobState);
+ int numContainerForWUs = (int) Math.ceil((numWUs * 1.0) /
maxWUsPerContainer);
- // TODO: decide how to account for container startup; working est. for
GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m)
- // e.g. can we amortize away / ignore when `targetNumMinutesForAllMWUs
>> workerRequestToReadyNumMinutes`?
- // TODO take into account that MWUs are quantized into discrete chunks;
this est. uses avg and presumes to divide partial MWUs amongst workers
- // can we we mostly ignore if we keep MWU "chunk size" "small-ish", like
maybe even just `duration(max(MWU)) <= targetNumMinutesForAllMWUs/2)`?
+ int recommendedNumContainers = Math.max(numContainerForWUs,
numContainerForThroughout);
+ log.info("Recommended auto-scaling: {} containers, no. of containers
considering throughput: {}, no. of containers considering WUs: {}",
+ recommendedNumContainers, numContainerForThroughout,
numContainerForWUs);
- int recommendedNumContainers = (int)
Math.floor(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs);
- log.info("Recommended auto-scaling: {} containers, given:
minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); "
- + "est. container*minutes to complete ALL ({}) MWUs = {}",
- recommendedNumContainers, minutesProcTimeForMeanMWU,
meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs);
return recommendedNumContainers;
}
+ /**
+ * Calculates the work unit processing capacity for a single container.
+ * <p>
+ * This is determined by multiplying the number of workers assigned to a
container by the number of threads available per worker.
+ * </p>
+ *
+ * @param jobState the current job state containing configuration properties
+ * @return the number of top-level work units (MWUs) that can be processed
concurrently by one container
+ */
protected int calcPerContainerWUCapacity(JobState jobState) {
int numWorkersPerContainer =
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
- int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY;
// TODO: get from config, once that's implemented
+ int numThreadsPerWorker =
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER);
return numWorkersPerContainer * numThreadsPerWorker;
}
+
+ /**
+ * Retrieves the amortized throughput rate in bytes per minute per thread
from the job configuration.
+ * <p>
+ * This value represents the expected processing rate for a worker thread
and is used to compute container's processing rate.
+ * </p>
+ *
+ * @param jobState the current job state containing configuration properties
+ * @return the amortized processing rate in bytes per minute per thread
+ */
protected long calcAmortizedBytesPerMinute(JobState jobState) {
- return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE,
DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE);
+ return
jobState.getPropAsLong(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE);
+ }
+
+ /**
+ * Determines the maximum number of work units that can be assigned to a
single container.
+ * <p>
+ * This limit is obtained from the job configuration and is used as an
alternative constraint when scaling containers.
+ * </p>
+ *
+ * @param jobState the current job state containing configuration properties
+ * @return the maximum allowed number of work units per container
+ */
+ protected int calcMaxWUPerContainer(JobState jobState) {
+ return
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER,
+ GobblinTemporalConfigurationKeys.DEFAULT_MAX_WORKUNITS_PER_CONTAINER);
}
+
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 27348858e7..12fe6c4d84 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -24,6 +24,7 @@ import com.typesafe.config.Config;
import io.temporal.client.WorkflowClient;
import io.temporal.worker.WorkerOptions;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker;
import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
import
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
@@ -36,15 +37,18 @@ import
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowIm
import
org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl;
import org.apache.gobblin.temporal.workflows.metrics.SubmitGTEActivityImpl;
+import org.apache.gobblin.util.ConfigUtils;
/** Worker for the {@link ProcessWorkUnitsWorkflowImpl} super-workflow */
public class WorkFulfillmentWorker extends AbstractTemporalWorker {
public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120; //
TODO: make configurable!
- public static final int MAX_EXECUTION_CONCURRENCY = 5; // TODO: make
configurable!
+ public int maxExecutionConcurrency;
public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient)
{
super(config, workflowClient);
+ this.maxExecutionConcurrency = ConfigUtils.getInt(config,
GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_THREADS_PER_WORKER);
}
@Override
@@ -64,9 +68,9 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
return WorkerOptions.newBuilder()
// default is only 1s - WAY TOO SHORT for
`o.a.hadoop.fs.FileSystem#listStatus`!
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
- .setMaxConcurrentActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
-
.setMaxConcurrentLocalActivityExecutionSize(MAX_EXECUTION_CONCURRENCY)
-
.setMaxConcurrentWorkflowTaskExecutionSize(MAX_EXECUTION_CONCURRENCY)
+
.setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency)
+
.setMaxConcurrentLocalActivityExecutionSize(this.maxExecutionConcurrency)
+
.setMaxConcurrentWorkflowTaskExecutionSize(this.maxExecutionConcurrency)
.build();
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 55f50fab22..198ab8a6d5 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
import java.io.IOException;
import java.net.URI;
-import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
@@ -110,7 +109,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
log.info("Recommended scaling to process WUs within {}: {}",
timeBudget, scalingDirectives);
try {
ScalingDirectivesRecipient recipient =
createScalingDirectivesRecipient(jobProps, closer);
- List<ScalingDirective> adjustedScalingDirectives =
adjustRecommendedScaling(scalingDirectives);
+ List<ScalingDirective> adjustedScalingDirectives =
adjustRecommendedScaling(jobProps, scalingDirectives);
log.info("Submitting (adjusted) scaling directives: {}",
adjustedScalingDirectives);
recipient.receive(adjustedScalingDirectives);
// TODO: when eliminating the "GenWUs Worker", pause/block until
scaling is complete
@@ -166,17 +165,19 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime,
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
// TODO: make fully configurable! for now, cap Work Discovery at 45 mins
and set aside 10 mins for the `CommitStepWorkflow`
long maxGenWUsMins = 45;
- long commitStepMins = 10;
+ long commitStepMins = 15;
long totalTargetTimeMins =
TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps,
ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
double permittedOveragePercentage = .2;
- Duration genWUsDuration = Duration.between(jobStartTime,
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
- long remainingMins = totalTargetTimeMins -
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
+
+ // since actual generate WU duration can vary significantly across jobs,
removing that from computation to enable deterministic duration for WU
processing
+ // Duration genWUsDuration = Duration.between(jobStartTime,
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
+ long remainingMins = totalTargetTimeMins - maxGenWUsMins - commitStepMins;
return TimeBudget.withOveragePercentage(remainingMins,
permittedOveragePercentage);
}
- protected List<ScalingDirective>
adjustRecommendedScaling(List<ScalingDirective> recommendedScalingDirectives) {
+ protected List<ScalingDirective> adjustRecommendedScaling(Properties
jobProps, List<ScalingDirective> recommendedScalingDirectives) {
// TODO: make any adjustments - e.g. decide whether to shutdown the (often
oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it
if (recommendedScalingDirectives.size() == 0) {
return recommendedScalingDirectives;
@@ -185,7 +186,8 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
ArrayList<ScalingDirective> adjustedScaling = new
ArrayList<>(recommendedScalingDirectives);
ScalingDirective firstDirective = adjustedScaling.get(0);
// deduct one for (already existing) `GenerateWorkUnits` worker (we
presume its "baseline" `WorkerProfile` similar enough to substitute for this
new one)
- adjustedScaling.set(0,
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1));
+ int initialContainerCount =
Integer.parseInt(jobProps.getProperty(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
"1"));
+ adjustedScaling.set(0,
firstDirective.updateSetPoint(firstDirective.getSetPoint() -
initialContainerCount));
// CAUTION: filter out set point zero, which (depending upon
`.getProfileName()`) *could* down-scale away our only current worker
// TODO: consider whether to allow either a) "pre-defining" a profile w/
set point zero, available for later use OR b) down-scaling to zero to pause
worker
return adjustedScaling.stream().filter(sd -> sd.getSetPoint() >
0).collect(Collectors.toList());
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
index f7470f1928..5cf83803e4 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.ddm.activity.impl;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -45,34 +46,183 @@ public class
RecommendScalingForWorkUnitsLinearHeuristicImplTest {
}
@Test
- public void testCalcDerivationSetPoint() {
-
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER),
Mockito.anyInt()))
- .thenReturn(4); // 4 workers per container
-
Mockito.when(jobState.getPropAsLong(Mockito.eq(RecommendScalingForWorkUnitsLinearHeuristicImpl.AMORTIZED_NUM_BYTES_PER_MINUTE),
Mockito.anyLong()))
- .thenReturn(100L * 1000 * 1000); // 100MB/minute
+ void testScalingDisabled() {
+
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false)).thenReturn(false);
+
Mockito.when(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY,
"1")).thenReturn("2");
+
+ long totalNumMWUs = 3000L;
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs);
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
// 500MB
+ int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary,
"someSourceClass", timeBudget, jobState);
+
+ Assert.assertEquals(2, result, "Should return initial container count if
dynamic scaling is disabled");
+ }
+
+ @Test
+ public void testCalcDerivationSetPointForNoWorkUnits() {
+
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false))
+ .thenReturn(true);
+ stubContainerCapacity(1,5);
+
+ Mockito.when(jobState.getPropAsLong(
+
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
Mockito.anyLong()))
+ .thenReturn(100L * 1000 * 1000);
+
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER),
Mockito.anyInt()))
+ .thenReturn(200);
+ Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(60L);
+
+ // Set both top-level and constituent work units to zero.
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(0L);
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
+
Mockito.when(workUnitsSizeSummary.getConstituentWorkUnitsCount()).thenReturn(0L);
+
+ int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary,
"source", timeBudget, jobState);
+ Assert.assertEquals(result, 0, "Expected 0 containers when there are no
work units");
+ }
+
+ @Test
+ public void testCalcDerivationSetPointWithThroughputConstraintDominating() {
+
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false))
+ .thenReturn(true);
+
+ stubContainerCapacity(4,5);
+
+ Mockito.when(jobState.getPropAsLong(
+
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
+ Mockito.anyLong()))
+ .thenReturn(50L * 1000 * 1000); // 50MB/minute
+
+
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER),
Mockito.anyInt()))
+ .thenReturn(5000);
+ Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(60L);
+
+ // Configure work units: throughput constraint will dominate.
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(1000L);
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
+
Mockito.when(workUnitsSizeSummary.getConstituentWorkUnitsCount()).thenReturn(1500L);
+
+ // Throughput calculation:
+ // totalBytes = 1000 * 500e6 = 500e9 bytes.
+ // containerProcRate = 50e6 * 20 = 1e9 bytes/min.
+ // estimated container minutes = 500e9 / 1e9 = 500 minutes.
+ // Throughput containers = ceil(500 / 60) = 9.
+ // Work unit constraint = ceil(1500 / 5000) = 1.
+ // Expected recommended containers = max(9, 1) = 9.
+ int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary,
"source", timeBudget, jobState);
+ Assert.assertEquals(result, 9, "Expected throughput constraint to dominate
and yield 9 containers");
+ }
+
+ @Test
+ public void testCalcDerivationSetPointWithWorkUnitConstraintDominating2() {
+ // Enable dynamic scaling.
+
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false))
+ .thenReturn(true);
+
+ stubContainerCapacity(4,5);
+
+ Mockito.when(jobState.getPropAsLong(
+
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
+ Mockito.anyLong()))
+ .thenReturn(100L * 1000 * 1000);
+
+
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER),
Mockito.anyInt()))
+ .thenReturn(10);
+
+ Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(120L);
+
+ // Configure work units:
+ // For throughput: top-level MWUs count and mean size.
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(1000L);
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
// 500MB per MWU
+ // For work unit constraint: total constituent work units.
+
Mockito.when(workUnitsSizeSummary.getConstituentWorkUnitsCount()).thenReturn(5000L);
+
+ // Throughput calculation:
+ // totalBytes = 1000 * 500e6 = 5.0E11 bytes.
+ // Container capacity = 4 workers * 5 threads = 20.
+ // Container throughput = 100e6 * 20 = 2.0E9 bytes/minute.
+ // Estimated container minutes = 5.0E11 / 2.0E9 = 250 minutes.
+ // Throughput constraint = ceil(250 / 120) = 3 containers.
+
+ // Work unit constraint = ceil(5000 / 10) = 500 containers.
+ // The recommended set point should be max(3, 500) = 500 containers.
+ int result = scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary,
"source", timeBudget, jobState);
+ Assert.assertEquals(result, 500, "Expected work unit constraint to
dominate and yield 500 containers");
+ }
+
+ @Test
+ public void testCalcDerivationSetPointWithChangingTimeBudget() {
+
Mockito.when(jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED,
false)).thenReturn(true);
+ stubContainerCapacity(4,5);
+
+ // Set throughput using the correct key.
+ Mockito.when(jobState.getPropAsLong(
+
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE),
+ Mockito.anyLong()))
+ .thenReturn(100L * 1000 * 1000); // 100 MB/minute per thread
+
+ // Set the work unit constraint so that throughput dominates.
+ Mockito.when(jobState.getPropAsInt(
+
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_WORKUNITS_PER_CONTAINER),
Mockito.anyInt()))
+ .thenReturn(50);
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(50L);
+
+
+ // Set the target job duration.
long targetTimeBudgetMinutes = 75L;
Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(targetTimeBudgetMinutes);
+ // Configure work units: 3000 top-level work units with an average size of
500 MB.
long totalNumMWUs = 3000L;
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs);
-
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
// 500MB
- // parallelization capacity = 20 container-slots (= 4 * 5)
- // per-container-slot rate = 5 container-slot-mins/mean(MWU) (= 500
MB/mean(MWU) / 100MB/min)
- long numMWUsPerMinutePerContainer = 4; // (amortized) per-container rate =
4 MWU/container-minute (= 20 / 5)
- long totalNumContainerMinutesAllMWUs = totalNumMWUs /
numMWUsPerMinutePerContainer; // 750 container-minutes (= 3000 MWU / 4
MWU/container-min)
- long expectedSetPoint = totalNumContainerMinutesAllMWUs /
targetTimeBudgetMinutes; // 10 containers (= 750 / 75)
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
// 500 MB
+
+
+ // Calculate expected throughput constraint:
+ // totalBytes = 3000 * 500e6 = 1.5e12 bytes.
+ // Container capacity = 4 * 5 = 20.
+ // Container throughput = 100e6 * 20 = 2e9 bytes/min.
+ // Estimated container minutes = 1.5e12 / 2e9 = 750 minutes.
+ // Throughput constraint = ceil(750 / 75) = 10 containers.
+ int expectedThroughputContainers = 10;
int resultA =
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass",
timeBudget, jobState);
- Assert.assertEquals(resultA, expectedSetPoint);
+ Assert.assertEquals(resultA, expectedThroughputContainers,
+ "Expected recommended containers based on throughput constraint");
- // verify: 3x MWUs ==> 3x the recommended set point
+ // Verify: Tripling the top-level work units should triple the recommended
set point.
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs
* 3);
int tripledResult =
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass",
timeBudget, jobState);
- Assert.assertEquals(tripledResult, resultA * 3);
+ Assert.assertEquals(tripledResult, resultA * 3,
+ "Tripling the number of top-level work units should triple the set
point");
- // reduce the target duration by a third, and verify: 3/2 the recommended
set point
+ // Test reduced time budget: for the tripled workload, reduce the target
duration.
+ // New target duration = 2 * (75 / 3) = 50 minutes.
Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(2 *
(targetTimeBudgetMinutes / 3));
+ // Recalculate throughput constraint:
+ // totalBytes = 9000 * 500e6 = 4.5e12 bytes.
+ // Container throughput remains 2e9 bytes/min.
+ // Estimated container minutes = 4.5e12 / 2e9 = 2250 minutes.
+ // Throughput constraint = ceil(2250 / 50) = 45 containers.
+ int expectedReducedContainers = 45;
int reducedTimeBudgetResult =
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass",
timeBudget, jobState);
- Assert.assertEquals(reducedTimeBudgetResult, (long)
Math.round(expectedSetPoint * 3 * (3.0 / 2.0)));
+ Assert.assertEquals(reducedTimeBudgetResult, expectedReducedContainers,
+ "Reducing the target duration should increase the recommended set
point proportionally");
}
+
+ /**
+ * Stubs the container capacity properties in the {@link JobState} mock.
+ *
+ * @param numWorkers the number of workers per container to stub in the
{@link JobState} mock.
+ * @param threadsPerWorker the number of threads per worker to stub in the
{@link JobState} mock.
+ */
+ private void stubContainerCapacity(int numWorkers, int threadsPerWorker) {
+ Mockito.when(jobState.getPropAsInt(
+
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER),
Mockito.anyInt()))
+ .thenReturn(numWorkers);
+ Mockito.when(jobState.getPropAsInt(
+
Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER),
Mockito.anyInt()))
+ .thenReturn(threadsPerWorker);
+ }
+
}