[ https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962221&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962221 ]
ASF GitHub Bot logged work on GOBBLIN-2199: ------------------------------------------- Author: ASF GitHub Bot Created on: 18/Mar/25 06:34 Start Date: 18/Mar/25 06:34 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4106: URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000306389 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java: ########## @@ -23,65 +23,128 @@ import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; -import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; /** * Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per: * - * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size - * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute - * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU) - * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon - * 2. calculate the per-container throughput of MWUs per minute - * 3. estimate the total per-container-minutes required to process all MWUs - * d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs - * 4. recommend the number of containers so all MWU processing should finish within the target number of minutes + * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link org.apache.gobblin.source.workunit.WorkUnit}s + * b. from the configured GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE, find the expected "processing rate" in bytes / minute + * 1. estimate the total container-minutes required to process all MWUs + * c. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs + * 2. estimate container count based on target minutes + * 2. estimate container count based on the maximum number of work units allowed per container + * 4. recommend the number of containers as max of above two container counts */ @Slf4j public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl { - public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute"; - public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec - + /** + * Calculates the recommended number of containers for processing the remaining work units. + * <p> + * This method first checks whether dynamic scaling is enabled via the job state configuration. + * If dynamic scaling is disabled, it returns the initial container count as specified in the job state. + * When dynamic scaling is enabled, it computes the throughput based on the count of constituent work units (WUs) + * and the processing rate (bytes per minute per thread). The calculation involves: + * <ol> + * <li>Computing the total bytes to be processed based on the count and mean size of top-level work units.</li> + * <li>Calculating the processing rate per container using the amortized bytes per minute rate and the container's work unit capacity.</li> + * <li>Estimating the total container-minutes required to process all MWUs and determining the number of containers needed + * to meet the job time budget.</li> + * <li>Computing an alternative container count based on the maximum number of work units allowed per container.</li> + * <li>Returning the maximum of the two computed container counts as the recommended scaling.</li> + * </ol> + * </p> + * + * @param remainingWork the summary of work unit sizes and counts remaining for processing + * @param sourceClass the name of the class invoking this method + * @param jobTimeBudget the time budget allocated for the job execution + * @param jobState the current job state that holds configuration properties and runtime parameters + * @return the recommended number of containers to allocate for processing the work units + */ @Override protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) { - // for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs) + if (!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false)) { + int initialContainerCount = Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, "1")); + log.info("Dynamic scaling is disabled, returning initial container count: " + initialContainerCount); + return initialContainerCount; + } + + long numWUs = remainingWork.getConstituentWorkUnitsCount(); long numMWUs = remainingWork.getTopLevelWorkUnitsCount(); double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize(); - int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs) - long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState); + double totalBytes = numMWUs * meanBytesPerMWU; Review Comment: yes, both are the same.. Since we need to determine the count using `Math.ceil` method which anyway requires a double value(otherwise it would be counted down), just used `totalBytes` directly instead of casting `getTotalSize` as double Issue Time Tracking ------------------- Worklog Id: (was: 962221) Time Spent: 0.5h (was: 20m) > Support dynamic container scaling on Temporal workload > ------------------------------------------------------ > > Key: GOBBLIN-2199 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2199 > Project: Apache Gobblin > Issue Type: Task > Reporter: Prateek Khandelwal > Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently, Gobblin runs static count of container(initial containers at the > start of job). We need to support dynamic scaling by computing the > recommended number of containers such that large data copy workloads can be > processed within some completion time and without running into OOM errors on > containers. -- This message was sent by Atlassian Jira (v8.20.10#820010)