[ 
https://issues.apache.org/jira/browse/GOBBLIN-1947?focusedWorklogId=893470&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893470
 ]

ASF GitHub Bot logged work on GOBBLIN-1947:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Dec/23 19:52
            Start Date: 01/Dec/23 19:52
    Worklog Time Spent: 10m 
      Work Description: hanghangliu commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1412514984


##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits) 
throws Exception {
     }
   }
 
+  /**
+   * The implementation of this method has the assumption that work unit 
change should never delete without adding new
+   * work units, which will cause starvation. Thus, process {@link 
WorkUnitChangeEvent} for two scenario:
+   * 1. workUnitChangeEvent only contains old tasks and no new tasks given: 
recalculate new work unit through kafka
+   * source and pack with a min container setting.
+   * 2. workUnitChangeEvent contains both valid old and new work unit: respect 
the information and directly remove
+   * old tasks and start new work units
+   *
+   * @param workUnitChangeEvent Event post by EventBus to specify old tasks to 
be removed and new tasks to be run
+   * @throws InvocationTargetException
+   */
+  @Override
+  @Subscribe
+  public void handleWorkUnitChangeEvent(WorkUnitChangeEvent 
workUnitChangeEvent)
+      throws InvocationTargetException {
+    log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+        workUnitChangeEvent.getOldTaskIds(), 
workUnitChangeEvent.getNewWorkUnits());
+    final JobState jobState = this.jobContext.getJobState();
+    List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+    // Use old task Id to recalculate new work units
+    if(workUnits == null || workUnits.isEmpty()) {
+      workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
+      // If no new valid work units can be generated, dismiss the 
WorkUnitChangeEvent
+      if(workUnits == null || workUnits.isEmpty()) {
+        log.info("Not able to update work unit meaningfully, dismiss the 
WorkUnitChangeEvent");
+        return;
+      }
+    }
+
+    // Follow how AbstractJobLauncher handles work units to make sure 
consistent behaviour
+    WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
+    workUnitStream = this.executeHandlers(workUnitStream, 
this.destDatasetHandlerService);
+    this.processWorkUnitStream(workUnitStream, jobState);
+    try {
+      this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+      this.addTasksToCurrentJob(workUnits);
+    } catch (Exception e) {
+      //todo: emit some event to indicate there is an error handling this 
event that may cause starvation
+      log.error("Failed to process WorkUnitChangeEvent with old tasks {} and 
new workunits {}.",
+          workUnitChangeEvent.getOldTaskIds(), workUnits, e);
+      throw new InvocationTargetException(e);

Review Comment:
   For the InvocationTargetException, it's actually inherited from [the super 
class](https://github.com/apache/gobblin/blob/694ed37649ce4d48e4f4810287c7245339d19069/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java#L281)
 which written by you :)



##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -275,17 +359,18 @@ protected void executeCancellation() {
     }
   }
 
-  protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove) 
throws IOException, ExecutionException,
+  protected void removeTasksFromCurrentJob(List<String> helixTaskIdsToRemove) 
throws IOException, ExecutionException,
                                                                                
     RetryException {
     String jobName = this.jobContext.getJobId();
     try (ParallelRunner stateSerDeRunner = new 
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
-      for (String workUnitId : workUnitIdsToRemove) {
+      for (String helixTaskId : helixTaskIdsToRemove) {
+        String workUnitId = 
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
         taskRetryer.call(new Callable<Boolean>() {
           @Override
           public Boolean call() throws Exception {
             String taskId = workUnitToHelixConfig.get(workUnitId).getId();

Review Comment:
   removed 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 893470)
    Time Spent: 1h  (was: 50m)

> Send WorkUnitChangeEvent when helix task consistently fail 
> -----------------------------------------------------------
>
>                 Key: GOBBLIN-1947
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1947
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-cluster
>            Reporter: Hanghang Liu
>            Assignee: Hung Tran
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> When YarnAutoScalingManager detect helix task consistently fail, give an 
> option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the 
> event and split the work unit during runtime. This can help resolving 
> consistent failing containers issue(like OOM) during runtime instead of 
> relying on replaner to restart the whole pipeline



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to