[ https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962216&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962216 ]
ASF GitHub Bot logged work on GOBBLIN-2199: ------------------------------------------- Author: ASF GitHub Bot Created on: 18/Mar/25 06:12 Start Date: 18/Mar/25 06:12 Worklog Time Spent: 10m Work Description: Blazer-007 commented on code in PR #4106: URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000257208 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java: ########## @@ -23,65 +23,128 @@ import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; -import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; /** * Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per: * - * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size - * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute - * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU) - * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon - * 2. calculate the per-container throughput of MWUs per minute - * 3. estimate the total per-container-minutes required to process all MWUs - * d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs - * 4. recommend the number of containers so all MWU processing should finish within the target number of minutes + * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link org.apache.gobblin.source.workunit.WorkUnit}s + * b. from the configured GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE, find the expected "processing rate" in bytes / minute + * 1. estimate the total container-minutes required to process all MWUs + * c. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs + * 2. estimate container count based on target minutes + * 2. estimate container count based on the maximum number of work units allowed per container + * 4. recommend the number of containers as max of above two container counts */ @Slf4j public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl { - public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute"; - public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec - + /** + * Calculates the recommended number of containers for processing the remaining work units. + * <p> + * This method first checks whether dynamic scaling is enabled via the job state configuration. + * If dynamic scaling is disabled, it returns the initial container count as specified in the job state. + * When dynamic scaling is enabled, it computes the throughput based on the count of constituent work units (WUs) + * and the processing rate (bytes per minute per thread). The calculation involves: + * <ol> + * <li>Computing the total bytes to be processed based on the count and mean size of top-level work units.</li> + * <li>Calculating the processing rate per container using the amortized bytes per minute rate and the container's work unit capacity.</li> + * <li>Estimating the total container-minutes required to process all MWUs and determining the number of containers needed + * to meet the job time budget.</li> + * <li>Computing an alternative container count based on the maximum number of work units allowed per container.</li> + * <li>Returning the maximum of the two computed container counts as the recommended scaling.</li> + * </ol> + * </p> + * + * @param remainingWork the summary of work unit sizes and counts remaining for processing + * @param sourceClass the name of the class invoking this method + * @param jobTimeBudget the time budget allocated for the job execution + * @param jobState the current job state that holds configuration properties and runtime parameters + * @return the recommended number of containers to allocate for processing the work units + */ @Override protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) { - // for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs) + if (!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false)) { + int initialContainerCount = Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, "1")); + log.info("Dynamic scaling is disabled, returning initial container count: " + initialContainerCount); + return initialContainerCount; + } + + long numWUs = remainingWork.getConstituentWorkUnitsCount(); long numMWUs = remainingWork.getTopLevelWorkUnitsCount(); double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize(); - int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs) - long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState); + double totalBytes = numMWUs * meanBytesPerMWU; Review Comment: we can use `remainingWork.getTotalSize` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java: ########## @@ -110,12 +110,13 @@ protected List<WorkUnit> loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste * NOTE: adapted from {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)} * @return count of how many tasks executed (0 if execution ultimately failed, but we *believe* TaskState should already have been recorded beforehand) */ - protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs, IssueRepository issueRepository) throws IOException, InterruptedException { + protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs, IssueRepository issueRepository, + Properties jobProperties) throws IOException, InterruptedException { String containerId = "container-id-for-wu-" + wu.getCorrelator(); StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs); TaskStateTracker taskStateTracker = createEssentializedTaskStateTracker(wu); - TaskExecutor taskExecutor = new TaskExecutor(new Properties()); + TaskExecutor taskExecutor = new TaskExecutor(jobProperties); Review Comment: Are we passing any property through `jobProperties` that is needed to create `taskExecutor` ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java: ########## @@ -23,65 +23,128 @@ import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; -import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; /** * Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per: * - * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size - * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute - * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU) - * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon - * 2. calculate the per-container throughput of MWUs per minute - * 3. estimate the total per-container-minutes required to process all MWUs - * d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs - * 4. recommend the number of containers so all MWU processing should finish within the target number of minutes + * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link org.apache.gobblin.source.workunit.WorkUnit}s + * b. from the configured GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE, find the expected "processing rate" in bytes / minute + * 1. estimate the total container-minutes required to process all MWUs + * c. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs + * 2. estimate container count based on target minutes + * 2. estimate container count based on the maximum number of work units allowed per container + * 4. recommend the number of containers as max of above two container counts */ @Slf4j public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl { - public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute"; - public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec - + /** + * Calculates the recommended number of containers for processing the remaining work units. + * <p> + * This method first checks whether dynamic scaling is enabled via the job state configuration. + * If dynamic scaling is disabled, it returns the initial container count as specified in the job state. + * When dynamic scaling is enabled, it computes the throughput based on the count of constituent work units (WUs) + * and the processing rate (bytes per minute per thread). The calculation involves: + * <ol> + * <li>Computing the total bytes to be processed based on the count and mean size of top-level work units.</li> + * <li>Calculating the processing rate per container using the amortized bytes per minute rate and the container's work unit capacity.</li> + * <li>Estimating the total container-minutes required to process all MWUs and determining the number of containers needed + * to meet the job time budget.</li> + * <li>Computing an alternative container count based on the maximum number of work units allowed per container.</li> + * <li>Returning the maximum of the two computed container counts as the recommended scaling.</li> + * </ol> + * </p> + * + * @param remainingWork the summary of work unit sizes and counts remaining for processing + * @param sourceClass the name of the class invoking this method + * @param jobTimeBudget the time budget allocated for the job execution + * @param jobState the current job state that holds configuration properties and runtime parameters + * @return the recommended number of containers to allocate for processing the work units + */ @Override protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) { - // for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs) + if (!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false)) { + int initialContainerCount = Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, "1")); + log.info("Dynamic scaling is disabled, returning initial container count: " + initialContainerCount); + return initialContainerCount; + } + + long numWUs = remainingWork.getConstituentWorkUnitsCount(); long numMWUs = remainingWork.getTopLevelWorkUnitsCount(); double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize(); - int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs) - long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState); + double totalBytes = numMWUs * meanBytesPerMWU; + long bytesPerMinuteProcRatePerThread = calcAmortizedBytesPerMinute(jobState); log.info("Calculating auto-scaling (for {} remaining work units within {}) using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}", - numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU); + numMWUs, jobTimeBudget, bytesPerMinuteProcRatePerThread, meanBytesPerMWU); Review Comment: log line `bytesPerMinuteProcRate` --> `bytesPerMinuteProcRatePerThread` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java: ########## @@ -166,17 +166,19 @@ protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties job protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) { // TODO: make fully configurable! for now, cap Work Discovery at 45 mins and set aside 10 mins for the `CommitStepWorkflow` long maxGenWUsMins = 45; - long commitStepMins = 10; + long commitStepMins = 15; long totalTargetTimeMins = TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY, ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES)); double permittedOveragePercentage = .2; Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant()); - long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins; + Review Comment: remove or comment out this line as well ? Issue Time Tracking ------------------- Worklog Id: (was: 962216) Remaining Estimate: 0h Time Spent: 10m > Support dynamic container scaling on Temporal workload > ------------------------------------------------------ > > Key: GOBBLIN-2199 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2199 > Project: Apache Gobblin > Issue Type: Task > Reporter: Prateek Khandelwal > Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Currently, Gobblin runs static count of container(initial containers at the > start of job). We need to support dynamic scaling by computing the > recommended number of containers such that large data copy workloads can be > processed within some completion time and without running into OOM errors on > containers. -- This message was sent by Atlassian Jira (v8.20.10#820010)