hanghangliu commented on code in PR #3832:
URL: https://github.com/apache/gobblin/pull/3832#discussion_r1412506511
##########
gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java:
##########
@@ -254,6 +270,74 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
}
}
+ /**
+ * The implementation of this method has the assumption that work unit
change should never delete without adding new
+ * work units, which will cause starvation. Thus, process {@link
WorkUnitChangeEvent} for two scenario:
+ * 1. workUnitChangeEvent only contains old tasks and no new tasks given:
recalculate new work unit through kafka
+ * source and pack with a min container setting.
+ * 2. workUnitChangeEvent contains both valid old and new work unit: respect
the information and directly remove
+ * old tasks and start new work units
+ *
+ * @param workUnitChangeEvent Event post by EventBus to specify old tasks to
be removed and new tasks to be run
+ * @throws InvocationTargetException
+ */
+ @Override
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ log.info("Received WorkUnitChangeEvent with old Task {} and new WU {}",
+ workUnitChangeEvent.getOldTaskIds(),
workUnitChangeEvent.getNewWorkUnits());
+ final JobState jobState = this.jobContext.getJobState();
+ List<WorkUnit> workUnits = workUnitChangeEvent.getNewWorkUnits();
+ // Use old task Id to recalculate new work units
+ if(workUnits == null || workUnits.isEmpty()) {
Review Comment:
When we send workUnitChangeEvent from YarnAutoScalingManager. The yarn
service only has the information of helix, so may not easy to pre-calculate the
new workUnit as this process need KafkaSource, which yarn isn't aware of.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]