[ https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962219&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962219 ]
ASF GitHub Bot logged work on GOBBLIN-2199: ------------------------------------------- Author: ASF GitHub Bot Created on: 18/Mar/25 06:29 Start Date: 18/Mar/25 06:29 Worklog Time Spent: 10m Work Description: khandelwal-prateek commented on code in PR #4106: URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000301539 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java: ########## @@ -110,12 +110,13 @@ protected List<WorkUnit> loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste * NOTE: adapted from {@link org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)} * @return count of how many tasks executed (0 if execution ultimately failed, but we *believe* TaskState should already have been recorded beforehand) */ - protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs, IssueRepository issueRepository) throws IOException, InterruptedException { + protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, JobState jobState, FileSystem fs, IssueRepository issueRepository, + Properties jobProperties) throws IOException, InterruptedException { String containerId = "container-id-for-wu-" + wu.getCorrelator(); StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs); TaskStateTracker taskStateTracker = createEssentializedTaskStateTracker(wu); - TaskExecutor taskExecutor = new TaskExecutor(new Properties()); + TaskExecutor taskExecutor = new TaskExecutor(jobProperties); Review Comment: Task executor determines parallelism using (`taskexecutor.threadpool.size`), without this change it is always using default value instead of parallelism being controlled by a config. ``` public TaskExecutor(Properties properties) { this(Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY, Integer.toString(ConfigurationKeys.DEFAULT_TASK_EXECUTOR_THREADPOOL_SIZE))), ... } ``` Issue Time Tracking ------------------- Worklog Id: (was: 962219) Time Spent: 20m (was: 10m) > Support dynamic container scaling on Temporal workload > ------------------------------------------------------ > > Key: GOBBLIN-2199 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2199 > Project: Apache Gobblin > Issue Type: Task > Reporter: Prateek Khandelwal > Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Currently, Gobblin runs static count of container(initial containers at the > start of job). We need to support dynamic scaling by computing the > recommended number of containers such that large data copy workloads can be > processed within some completion time and without running into OOM errors on > containers. -- This message was sent by Atlassian Jira (v8.20.10#820010)