[ 
https://issues.apache.org/jira/browse/GOBBLIN-1947?focusedWorklogId=893469&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893469
 ]

ASF GitHub Bot logged work on GOBBLIN-1947:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Dec/23 19:50
            Start Date: 01/Dec/23 19:50
    Worklog Time Spent: 10m 
      Work Description: hanghangliu commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1412513245


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits) 
throws Exception {
     }
   }
 
+  /**
+   * The implementation of this method has the assumption that work unit 
change should never delete without adding new
+   * work units, which will cause starvation. Thus, process {@link 
WorkUnitChangeEvent} for two scenario:
+   * 1. workUnitChangeEvent only contains old tasks and no new tasks given: 
recalculate new work unit through kafka
+   * source and pack with a min container setting.
+   * 2. workUnitChangeEvent contains both valid old and new work unit: respect 
the information and directly remove
+   * old tasks and start new work units
+   *
+   * @param workUnitChangeEvent Event post by EventBus to specify old tasks to 
be removed and new tasks to be run
+   * @throws InvocationTargetException
+   */
+  @Override
+  @Subscribe
+  public void handleWorkUnitChangeEvent(WorkUnitChangeEvent 
workUnitChangeEvent)
+      throws InvocationTargetException {
+    log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+        workUnitChangeEvent.getOldTaskIds(), 
workUnitChangeEvent.getNewWorkUnits());
+    final JobState jobState = this.jobContext.getJobState();
+    List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+    // Use old task Id to recalculate new work units
+    if(workUnits == null || workUnits.isEmpty()) {
+      workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
+      // If no new valid work units can be generated, dismiss the 
WorkUnitChangeEvent
+      if(workUnits == null || workUnits.isEmpty()) {
+        log.info("Not able to update work unit meaningfully, dismiss the 
WorkUnitChangeEvent");
+        return;
+      }
+    }
+
+    // Follow how AbstractJobLauncher handles work units to make sure 
consistent behaviour
+    WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
+    workUnitStream = this.executeHandlers(workUnitStream, 
this.destDatasetHandlerService);
+    this.processWorkUnitStream(workUnitStream, jobState);
+    try {
+      this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+      this.addTasksToCurrentJob(workUnits);
+    } catch (Exception e) {
+      //todo: emit some event to indicate there is an error handling this 
event that may cause starvation
+      log.error("Failed to process WorkUnitChangeEvent with old tasks {} and 
new workunits {}.",
+          workUnitChangeEvent.getOldTaskIds(), workUnits, e);
+      throw new InvocationTargetException(e);

Review Comment:
   Throwing error actually won't fail the application, so we rely on the 
Retryer and replanner(if Retryer also failed) here. I've run the test for a 
week and didn't see any error throwing, but I do agree this may be hard to 
debug. 
   I've tried to restart the whole workflow but it's not very straightforward 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 893469)
    Time Spent: 50m  (was: 40m)

> Send WorkUnitChangeEvent when helix task consistently fail 
> -----------------------------------------------------------
>
>                 Key: GOBBLIN-1947
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1947
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-cluster
>            Reporter: Hanghang Liu
>            Assignee: Hung Tran
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> When YarnAutoScalingManager detect helix task consistently fail, give an 
> option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the 
> event and split the work unit during runtime. This can help resolving 
> consistent failing containers issue(like OOM) during runtime instead of 
> relying on replaner to restart the whole pipeline



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to