[
https://issues.apache.org/jira/browse/GOBBLIN-1947?focusedWorklogId=891675&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-891675
]
ASF GitHub Bot logged work on GOBBLIN-1947:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Nov/23 01:12
Start Date: 22/Nov/23 01:12
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1401380645
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
}
}
+ /**
+ * The implementation of this method has the assumption that work unit
change should never delete without adding new
+ * work units, which will cause starvation. Thus, process {@link
WorkUnitChangeEvent} for two scenario:
+ * 1. workUnitChangeEvent only contains old tasks and no new tasks given:
recalculate new work unit through kafka
+ * source and pack with a min container setting.
+ * 2. workUnitChangeEvent contains both valid old and new work unit: respect
the information and directly remove
+ * old tasks and start new work units
+ *
+ * @param workUnitChangeEvent Event post by EventBus to specify old tasks to
be removed and new tasks to be run
+ * @throws InvocationTargetException
+ */
+ @Override
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+ workUnitChangeEvent.getOldTaskIds(),
workUnitChangeEvent.getNewWorkUnits());
+ final JobState jobState = this.jobContext.getJobState();
+ List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+ // Use old task Id to recalculate new work units
+ if(workUnits == null || workUnits.isEmpty()) {
Review Comment:
In which scenario, we will have no new work units here?
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -514,6 +599,7 @@ private TaskConfig getTaskConfig(WorkUnit workUnit,
ParallelRunner stateSerDeRun
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY,
"true");
TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
Review Comment:
Basically seems helixIdTaskConfigMap and workUnitToHelixConfig are similar,
can we just use one of them to reduce complexity?
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -514,6 +599,7 @@ private TaskConfig getTaskConfig(WorkUnit workUnit,
ParallelRunner stateSerDeRun
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY,
"true");
TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
Review Comment:
same here, do we still need this? I feel you want to use helix TaskId
instead of work unit Id here
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
}
}
+ /**
+ * The implementation of this method has the assumption that work unit
change should never delete without adding new
+ * work units, which will cause starvation. Thus, process {@link
WorkUnitChangeEvent} for two scenario:
+ * 1. workUnitChangeEvent only contains old tasks and no new tasks given:
recalculate new work unit through kafka
+ * source and pack with a min container setting.
+ * 2. workUnitChangeEvent contains both valid old and new work unit: respect
the information and directly remove
+ * old tasks and start new work units
+ *
+ * @param workUnitChangeEvent Event post by EventBus to specify old tasks to
be removed and new tasks to be run
+ * @throws InvocationTargetException
+ */
+ @Override
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+ workUnitChangeEvent.getOldTaskIds(),
workUnitChangeEvent.getNewWorkUnits());
+ final JobState jobState = this.jobContext.getJobState();
+ List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+ // Use old task Id to recalculate new work units
+ if(workUnits == null || workUnits.isEmpty()) {
+ workUnits = recalculateWorkUnit(workUnitChangeEvent.getOldTaskIds());
+ // If no new valid work units can be generated, dismiss the
WorkUnitChangeEvent
+ if(workUnits == null || workUnits.isEmpty()) {
+ log.info("Not able to update work unit meaningfully, dismiss the
WorkUnitChangeEvent");
+ return;
+ }
+ }
+
+ // Follow how AbstractJobLauncher handles work units to make sure
consistent behaviour
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ workUnitStream = this.executeHandlers(workUnitStream,
this.destDatasetHandlerService);
+ this.processWorkUnitStream(workUnitStream, jobState);
+ try {
+ this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+ this.addTasksToCurrentJob(workUnits);
+ } catch (Exception e) {
+ //todo: emit some event to indicate there is an error handling this
event that may cause starvation
+ log.error("Failed to process WorkUnitChangeEvent with old tasks {} and
new workunits {}.",
+ workUnitChangeEvent.getOldTaskIds(), workUnits, e);
+ throw new InvocationTargetException(e);
Review Comment:
did we test what's the behavior for throw this exception? are we able to
catch it and fail the whole application and restart directly? Or it will
finally fail silently and starve?
Also curious why do we throw InvocationTargetException?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java:
##########
@@ -507,9 +513,10 @@ public void apply(JobListener jobListener, JobContext
jobContext)
}
// Perform work needed before writing is done
- Boolean canCleanUp =
this.canCleanStagingData(this.jobContext.getJobState());
- workUnitStream = closer.register(new
DestinationDatasetHandlerService(jobState, canCleanUp, this.eventSubmitter))
- .executeHandlers(workUnitStream);
+ this.canCleanUpStagingData =
this.canCleanStagingData(this.jobContext.getJobState());
Review Comment:
Is this from other change? seems not related to this PR
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -275,17 +359,18 @@ protected void executeCancellation() {
}
}
- protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove)
throws IOException, ExecutionException,
+ protected void removeTasksFromCurrentJob(List<String> helixTaskIdsToRemove)
throws IOException, ExecutionException,
RetryException {
String jobName = this.jobContext.getJobId();
try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
- for (String workUnitId : workUnitIdsToRemove) {
+ for (String helixTaskId : helixTaskIdsToRemove) {
+ String workUnitId =
helixIdTaskConfigMap.get(helixTaskId).getConfigMap().get(ConfigurationKeys.TASK_ID_KEY);
taskRetryer.call(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
String taskId = workUnitToHelixConfig.get(workUnitId).getId();
Review Comment:
do you still need this?
Issue Time Tracking
-------------------
Worklog Id: (was: 891675)
Time Spent: 0.5h (was: 20m)
> Send WorkUnitChangeEvent when helix task consistently fail
> -----------------------------------------------------------
>
> Key: GOBBLIN-1947
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1947
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-cluster
> Reporter: Hanghang Liu
> Assignee: Hung Tran
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> When YarnAutoScalingManager detect helix task consistently fail, give an
> option to send WorkUnitChangeEvent to let GobblinHelixJobLauncher handle the
> event and split the work unit during runtime. This can help resolving
> consistent failing containers issue(like OOM) during runtime instead of
> relying on replaner to restart the whole pipeline
--
This message was sent by Atlassian Jira
(v8.20.10#820010)