[
https://issues.apache.org/jira/browse/GOBBLIN-1947?focusedWorklogId=893467&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-893467
]
ASF GitHub Bot logged work on GOBBLIN-1947:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Dec/23 19:43
Start Date: 01/Dec/23 19:43
Worklog Time Spent: 10m
Work Description: hanghangliu commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1412506511
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
}
}
+ /**
+ * The implementation of this method has the assumption that work unit
change should never delete without adding new
+ * work units, which will cause starvation. Thus, process {@link
WorkUnitChangeEvent} for two scenario:
+ * 1. workUnitChangeEvent only contains old tasks and no new tasks given:
recalculate new work unit through kafka
+ * source and pack with a min container setting.
+ * 2. workUnitChangeEvent contains both valid old and new work unit: respect
the information and directly remove
+ * old tasks and start new work units
+ *
+ * @param workUnitChangeEvent Event post by EventBus to specify old tasks to
be removed and new tasks to be run
+ * @throws InvocationTargetException
+ */
+ @Override
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+ workUnitChangeEvent.getOldTaskIds(),
workUnitChangeEvent.getNewWorkUnits());
+ final JobState jobState = this.jobContext.getJobState();
+ List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+ // Use old task Id to recalculate new work units
+ if(workUnits == null || workUnits.isEmpty()) {
Review Comment:
When we send workUnitChangeEvent from YarnAutoScalingManager. The yarn
service only has the information of helix, so may not easy to pre-calculate the
new workUnit as this process need KafkaSource, which yarn isn't aware of.
Issue Time Tracking
-------------------
Worklog Id: (was: 893467)
Time Spent: 40m (was: 0.5h)
> Send WorkUnitChangeEvent when helix task consistently fail
> -----------------------------------------------------------
>
> Key: GOBBLIN-1947
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1947
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-cluster
> Reporter: Hanghang Liu
> Assignee: Hung Tran
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> When YarnAutoScalingManager detect helix task consistently fail, give an
> option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the
> event and split the work unit during runtime. This can help resolving
> consistent failing containers issue(like OOM) during runtime instead of
> relying on replaner to restart the whole pipeline
--
This message was sent by Atlassian Jira
(v8.20.10#820010)