[
https://issues.apache.org/jira/browse/GOBBLIN-1947?focusedWorklogId=893470&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893470
]
ASF GitHub Bot logged work on GOBBLIN-1947:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Dec/23 19:52
Start Date: 01/Dec/23 19:52
Worklog Time Spent: 10m
Work Description: hanghangliu commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1412514984
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
}
}
+ /**
+ * The implementation of this method has the assumption that work unit
change should never delete without adding new
+ * work units, which will cause starvation. Thus, process {@link
WorkUnitChangeEvent} for two scenario:
+ * 1. workUnitChangeEvent only contains old tasks and no new tasks given:
recalculate new work unit through kafka
+ * source and pack with a min container setting.
+ * 2. workUnitChangeEvent contains both valid old and new work unit: respect
the information and directly remove
+ * old tasks and start new work units
+ *
+ * @param workUnitChangeEvent Event post by EventBus to specify old tasks to
be removed and new tasks to be run
+ * @throws InvocationTargetException
+ */
+ @Override
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+ workUnitChangeEvent.getOldTaskIds(),
workUnitChangeEvent.getNewWorkUnits());
+ final JobState jobState = this.jobContext.getJobState();
+ List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+ // Use old task Id to recalculate new work units
+ if(workUnits == null || workUnits.isEmpty()) {
+ workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
+ // If no new valid work units can be generated, dismiss the
WorkUnitChangeEvent
+ if(workUnits == null || workUnits.isEmpty()) {
+ log.info("Not able to update work unit meaningfully, dismiss the
WorkUnitChangeEvent");
+ return;
+ }
+ }
+
+ // Follow how AbstractJobLauncher handles work units to make sure
consistent behaviour
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ workUnitStream = this.executeHandlers(workUnitStream,
this.destDatasetHandlerService);
+ this.processWorkUnitStream(workUnitStream, jobState);
+ try {
+ this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+ this.addTasksToCurrentJob(workUnits);
+ } catch (Exception e) {
+ //todo: emit some event to indicate there is an error handling this
event that may cause starvation
+ log.error("Failed to process WorkUnitChangeEvent with old tasks {} and
new workunits {}.",
+ workUnitChangeEvent.getOldTaskIds(), workUnits, e);
+ throw new InvocationTargetException(e);
Review Comment:
For the InvocationTargetException, it's actually inherited from [the super
class](https://github.com/apache/gobblin/blob/694ed37649ce4d48e4f4810287c7245339d19069/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java#L281)
which written by you :)
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -275,17 +359,18 @@ protected void executeCancellation() {
}
}
- protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove)
throws IOException, ExecutionException,
+ protected void removeTasksFromCurrentJob(List<String> helixTaskIdsToRemove)
throws IOException, ExecutionException,
RetryException {
String jobName = this.jobContext.getJobId();
try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
- for (String workUnitId : workUnitIdsToRemove) {
+ for (String helixTaskId : helixTaskIdsToRemove) {
+ String workUnitId =
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
taskRetryer.call(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
String taskId = workUnitToHelixConfig.get(workUnitId).getId();
Review Comment:
removed
Issue Time Tracking
-------------------
Worklog Id: (was: 893470)
Time Spent: 1h (was: 50m)
> Send WorkUnitChangeEvent when helix task consistently fail
> -----------------------------------------------------------
>
> Key: GOBBLIN-1947
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1947
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-cluster
> Reporter: Hanghang Liu
> Assignee: Hung Tran
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> When YarnAutoScalingManager detect helix task consistently fail, give an
> option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the
> event and split the work unit during runtime. This can help resolving
> consistent failing containers issue(like OOM) during runtime instead of
> relying on replaner to restart the whole pipeline
--
This message was sent by Atlassian Jira
(v8.20.10#820010)