Blazer-007 commented on code in PR #4106: URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000257208
########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java: ########## @@ -23,65 +23,128 @@ import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; -import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; /** * Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per: * - * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size - * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute - * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU) - * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon - * 2. calculate the per-container throughput of MWUs per minute - * 3. estimate the total per-container-minutes required to process all MWUs - * d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs - * 4. recommend the number of containers so all MWU processing should finish within the target number of minutes + * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link org.apache.gobblin.source.workunit.WorkUnit}s + * b. from the configured GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE, find the expected "processing rate" in bytes / minute + * 1. estimate the total container-minutes required to process all MWUs + * c. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs + * 2. estimate container count based on target minutes + * 2. estimate container count based on the maximum number of work units allowed per container + * 4. recommend the number of containers as max of above two container counts */ @Slf4j public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl { - public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute"; - public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec - + /** + * Calculates the recommended number of containers for processing the remaining work units. + * <p> + * This method first checks whether dynamic scaling is enabled via the job state configuration. + * If dynamic scaling is disabled, it returns the initial container count as specified in the job state. + * When dynamic scaling is enabled, it computes the throughput based on the count of constituent work units (WUs) + * and the processing rate (bytes per minute per thread). The calculation involves: + * <ol> + * <li>Computing the total bytes to be processed based on the count and mean size of top-level work units.</li> + * <li>Calculating the processing rate per container using the amortized bytes per minute rate and the container's work unit capacity.</li> + * <li>Estimating the total container-minutes required to process all MWUs and determining the number of containers needed + * to meet the job time budget.</li> + * <li>Computing an alternative container count based on the maximum number of work units allowed per container.</li> + * <li>Returning the maximum of the two computed container counts as the recommended scaling.</li> + * </ol> + * </p> + * + * @param remainingWork the summary of work unit sizes and counts remaining for processing + * @param sourceClass the name of the class invoking this method + * @param jobTimeBudget the time budget allocated for the job execution + * @param jobState the current job state that holds configuration properties and runtime parameters + * @return the recommended number of containers to allocate for processing the work units + */ @Override protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) { - // for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs) + if (!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false)) { + int initialContainerCount = Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, "1")); + log.info("Dynamic scaling is disabled, returning initial container count: " + initialContainerCount); + return initialContainerCount; + } + + long numWUs = remainingWork.getConstituentWorkUnitsCount(); long numMWUs = remainingWork.getTopLevelWorkUnitsCount(); double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize(); - int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs) - long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState); + double totalBytes = numMWUs * meanBytesPerMWU; Review Comment: we can use `remainingWork.getTotalSize` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java: ########## @@ -110,12 +110,13 @@ protected List<WorkUnit> loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste * NOTE: adapted from {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)} * @return count of how many tasks executed (0 if execution ultimately failed, but we *believe* TaskState should already have been recorded beforehand) */ - protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs, IssueRepository issueRepository) throws IOException, InterruptedException { + protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs, IssueRepository issueRepository, + Properties jobProperties) throws IOException, InterruptedException { String containerId = "container-id-for-wu-" + wu.getCorrelator(); StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs); TaskStateTracker taskStateTracker = createEssentializedTaskStateTracker(wu); - TaskExecutor taskExecutor = new TaskExecutor(new Properties()); + TaskExecutor taskExecutor = new TaskExecutor(jobProperties); Review Comment: Are we passing any property through `jobProperties` that is needed to create `taskExecutor` ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java: ########## @@ -23,65 +23,128 @@ import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.temporal.ddm.work.TimeBudget; import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; -import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; /** * Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per: * - * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size - * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute - * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU) - * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon - * 2. calculate the per-container throughput of MWUs per minute - * 3. estimate the total per-container-minutes required to process all MWUs - * d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs - * 4. recommend the number of containers so all MWU processing should finish within the target number of minutes + * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) {@link org.apache.gobblin.source.workunit.WorkUnit}s + * b. from the configured GobblinTemporalConfigurationKeys.TEMPORAL_WORKER_THREAD_AMORTIZED_THROUGHPUT_PER_MINUTE, find the expected "processing rate" in bytes / minute + * 1. estimate the total container-minutes required to process all MWUs + * c. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs + * 2. estimate container count based on target minutes + * 2. estimate container count based on the maximum number of work units allowed per container + * 4. recommend the number of containers as max of above two container counts */ @Slf4j public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends AbstractRecommendScalingForWorkUnitsImpl { - public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "heuristic.params.numBytesPerMinute"; - public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L * 1000L * 60L; // 80MB/sec - + /** + * Calculates the recommended number of containers for processing the remaining work units. + * <p> + * This method first checks whether dynamic scaling is enabled via the job state configuration. + * If dynamic scaling is disabled, it returns the initial container count as specified in the job state. + * When dynamic scaling is enabled, it computes the throughput based on the count of constituent work units (WUs) + * and the processing rate (bytes per minute per thread). The calculation involves: + * <ol> + * <li>Computing the total bytes to be processed based on the count and mean size of top-level work units.</li> + * <li>Calculating the processing rate per container using the amortized bytes per minute rate and the container's work unit capacity.</li> + * <li>Estimating the total container-minutes required to process all MWUs and determining the number of containers needed + * to meet the job time budget.</li> + * <li>Computing an alternative container count based on the maximum number of work units allowed per container.</li> + * <li>Returning the maximum of the two computed container counts as the recommended scaling.</li> + * </ol> + * </p> + * + * @param remainingWork the summary of work unit sizes and counts remaining for processing + * @param sourceClass the name of the class invoking this method + * @param jobTimeBudget the time budget allocated for the job execution + * @param jobState the current job state that holds configuration properties and runtime parameters + * @return the recommended number of containers to allocate for processing the work units + */ @Override protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, String sourceClass, TimeBudget jobTimeBudget, JobState jobState) { - // for simplicity, for now, consider only top-level work units (aka. `MultiWorkUnit`s - MWUs) + if (!jobState.getPropAsBoolean(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_ENABLED, false)) { + int initialContainerCount = Integer.valueOf(jobState.getProp(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, "1")); + log.info("Dynamic scaling is disabled, returning initial container count: " + initialContainerCount); + return initialContainerCount; + } + + long numWUs = remainingWork.getConstituentWorkUnitsCount(); long numMWUs = remainingWork.getTopLevelWorkUnitsCount(); double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize(); - int numSimultaneousMWUsPerContainer = calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for top-level (MWUs) - not constituent sub-WUs) - long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState); + double totalBytes = numMWUs * meanBytesPerMWU; + long bytesPerMinuteProcRatePerThread = calcAmortizedBytesPerMinute(jobState); log.info("Calculating auto-scaling (for {} remaining work units within {}) using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}", - numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU); + numMWUs, jobTimeBudget, bytesPerMinuteProcRatePerThread, meanBytesPerMWU); Review Comment: log line `bytesPerMinuteProcRate` --> `bytesPerMinuteProcRatePerThread` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java: ########## @@ -166,17 +166,19 @@ protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties job protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) { // TODO: make fully configurable! for now, cap Work Discovery at 45 mins and set aside 10 mins for the `CommitStepWorkflow` long maxGenWUsMins = 45; - long commitStepMins = 10; + long commitStepMins = 15; long totalTargetTimeMins = TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY, ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES)); double permittedOveragePercentage = .2; Duration genWUsDuration = Duration.between(jobStartTime, TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant()); - long remainingMins = totalTargetTimeMins - Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins; + Review Comment: remove or comment out this line as well ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org