[
https://issues.apache.org/jira/browse/GOBBLIN-1947?focusedWorklogId=893469&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893469
]
ASF GitHub Bot logged work on GOBBLIN-1947:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Dec/23 19:50
Start Date: 01/Dec/23 19:50
Worklog Time Spent: 10m
Work Description: hanghangliu commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1412513245
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
}
}
+ /**
+ * The implementation of this method has the assumption that work unit
change should never delete without adding new
+ * work units, which will cause starvation. Thus, process {@link
WorkUnitChangeEvent} for two scenario:
+ * 1. workUnitChangeEvent only contains old tasks and no new tasks given:
recalculate new work unit through kafka
+ * source and pack with a min container setting.
+ * 2. workUnitChangeEvent contains both valid old and new work unit: respect
the information and directly remove
+ * old tasks and start new work units
+ *
+ * @param workUnitChangeEvent Event post by EventBus to specify old tasks to
be removed and new tasks to be run
+ * @throws InvocationTargetException
+ */
+ @Override
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+ workUnitChangeEvent.getOldTaskIds(),
workUnitChangeEvent.getNewWorkUnits());
+ final JobState jobState = this.jobContext.getJobState();
+ List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+ // Use old task Id to recalculate new work units
+ if(workUnits == null || workUnits.isEmpty()) {
+ workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
+ // If no new valid work units can be generated, dismiss the
WorkUnitChangeEvent
+ if(workUnits == null || workUnits.isEmpty()) {
+ log.info("Not able to update work unit meaningfully, dismiss the
WorkUnitChangeEvent");
+ return;
+ }
+ }
+
+ // Follow how AbstractJobLauncher handles work units to make sure
consistent behaviour
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ workUnitStream = this.executeHandlers(workUnitStream,
this.destDatasetHandlerService);
+ this.processWorkUnitStream(workUnitStream, jobState);
+ try {
+ this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+ this.addTasksToCurrentJob(workUnits);
+ } catch (Exception e) {
+ //todo: emit some event to indicate there is an error handling this
event that may cause starvation
+ log.error("Failed to process WorkUnitChangeEvent with old tasks {} and
new workunits {}.",
+ workUnitChangeEvent.getOldTaskIds(), workUnits, e);
+ throw new InvocationTargetException(e);
Review Comment:
Throwing error actually won't fail the application, so we rely on the
Retryer and replanner(if Retryer also failed) here. I've run the test for a
week and didn't see any error throwing, but I do agree this may be hard to
debug.
I've tried to restart the whole workflow but it's not very straightforward
Issue Time Tracking
-------------------
Worklog Id: (was: 893469)
Time Spent: 50m (was: 40m)
> Send WorkUnitChangeEvent when helix task consistently fail
> -----------------------------------------------------------
>
> Key: GOBBLIN-1947
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1947
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-cluster
> Reporter: Hanghang Liu
> Assignee: Hung Tran
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> When YarnAutoScalingManager detect helix task consistently fail, give an
> option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the
> event and split the work unit during runtime. This can help resolving
> consistent failing containers issue(like OOM) during runtime instead of
> relying on replaner to restart the whole pipeline
--
This message was sent by Atlassian Jira
(v8.20.10#820010)