[ 
https://issues.apache.org/jira/browse/GOBBLIN-1947?focusedWorklogId=893467&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893467
 ]

ASF GitHub Bot logged work on GOBBLIN-1947:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Dec/23 19:43
            Start Date: 01/Dec/23 19:43
    Worklog Time Spent: 10m 
      Work Description: hanghangliu commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1412506511


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits) 
throws Exception {
     }
   }
 
+  /**
+   * The implementation of this method has the assumption that work unit 
change should never delete without adding new
+   * work units, which will cause starvation. Thus, process {@link 
WorkUnitChangeEvent} for two scenario:
+   * 1. workUnitChangeEvent only contains old tasks and no new tasks given: 
recalculate new work unit through kafka
+   * source and pack with a min container setting.
+   * 2. workUnitChangeEvent contains both valid old and new work unit: respect 
the information and directly remove
+   * old tasks and start new work units
+   *
+   * @param workUnitChangeEvent Event post by EventBus to specify old tasks to 
be removed and new tasks to be run
+   * @throws InvocationTargetException
+   */
+  @Override
+  @Subscribe
+  public void handleWorkUnitChangeEvent(WorkUnitChangeEvent 
workUnitChangeEvent)
+      throws InvocationTargetException {
+    log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+        workUnitChangeEvent.getOldTaskIds(), 
workUnitChangeEvent.getNewWorkUnits());
+    final JobState jobState = this.jobContext.getJobState();
+    List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+    // Use old task Id to recalculate new work units
+    if(workUnits == null || workUnits.isEmpty()) {

Review Comment:
   When we send workUnitChangeEvent from YarnAutoScalingManager. The yarn 
service only has the information of helix, so may not easy to pre-calculate the 
new workUnit as this process need KafkaSource, which yarn isn't aware of.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 893467)
    Time Spent: 40m  (was: 0.5h)

> Send WorkUnitChangeEvent when helix task consistently fail 
> -----------------------------------------------------------
>
>                 Key: GOBBLIN-1947
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1947
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-cluster
>            Reporter: Hanghang Liu
>            Assignee: Hung Tran
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> When YarnAutoScalingManager detect helix task consistently fail, give an 
> option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the 
> event and split the work unit during runtime. This can help resolving 
> consistent failing containers issue(like OOM) during runtime instead of 
> relying on replaner to restart the whole pipeline



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to