[ 
https://issues.apache.org/jira/browse/GOBBLIN-2199?focusedWorklogId=962219&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-962219
 ]

ASF GitHub Bot logged work on GOBBLIN-2199:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Mar/25 06:29
            Start Date: 18/Mar/25 06:29
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4106:
URL: https://github.com/apache/gobblin/pull/4106#discussion_r2000301539


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -110,12 +110,13 @@ protected List<WorkUnit> 
loadFlattenedWorkUnits(WorkUnitClaimCheck wu, FileSyste
    * NOTE: adapted from {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher.TaskRunner#run(org.apache.hadoop.mapreduce.Mapper.Context)}
    * @return count of how many tasks executed (0 if execution ultimately 
failed, but we *believe* TaskState should already have been recorded beforehand)
    */
-  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs, IssueRepository issueRepository) throws 
IOException, InterruptedException {
+  protected int execute(List<WorkUnit> workUnits, WorkUnitClaimCheck wu, 
JobState jobState, FileSystem fs, IssueRepository issueRepository,
+                        Properties jobProperties) throws IOException, 
InterruptedException {
     String containerId = "container-id-for-wu-" + wu.getCorrelator();
     StateStore<TaskState> taskStateStore = Help.openTaskStateStore(wu, fs);
 
     TaskStateTracker taskStateTracker = 
createEssentializedTaskStateTracker(wu);
-    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+    TaskExecutor taskExecutor = new TaskExecutor(jobProperties);

Review Comment:
   Task executor determines parallelism using (`taskexecutor.threadpool.size`), 
without this change it is always using default value instead of parallelism 
being controlled by a config.
   ```
     public TaskExecutor(Properties properties) {
       
this(Integer.parseInt(properties.getProperty(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY,
               
Integer.toString(ConfigurationKeys.DEFAULT_TASK_EXECUTOR_THREADPOOL_SIZE))),
       ...
     }
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 962219)
    Time Spent: 20m  (was: 10m)

> Support dynamic container scaling on Temporal workload
> ------------------------------------------------------
>
>                 Key: GOBBLIN-2199
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2199
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Prateek Khandelwal
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently, Gobblin runs static count of container(initial containers at the 
> start of job). We need to support dynamic scaling by computing the 
> recommended number of containers such that large data copy workloads can be 
> processed within some completion time and without running into OOM errors on 
> containers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to