sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256509276
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
##########
@@ -252,26 +270,38 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
this.taskStateCollectorService.startAsync().awaitRunning();
LOG.info("Launching Hadoop MR job " + this.job.getJobName());
- this.job.submit();
- this.hadoopJobSubmitted = true;
+ try (FiniteStateMachine<MRJobLauncherState>.Transition t =
this.fsm.startTransition(MRJobLauncherState.RUNNING)) {
+ try {
+ this.job.submit();
+ } catch (Throwable exc) {
+ t.changeEndState(MRJobLauncherState.FAILED);
+ throw exc;
+ }
+ this.hadoopJobSubmitted = true;
- // Set job tracking URL to the Hadoop job tracking URL if it is not set
yet
- if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
- jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY,
this.job.getTrackingURL());
+ // Set job tracking URL to the Hadoop job tracking URL if it is not
set yet
+ if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
+ jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY,
this.job.getTrackingURL());
+ }
+ } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
+ LOG.info("Cannot start MR job.", unallowed);
}
- TimingEvent mrJobRunTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
- LOG.info(String.format("Waiting for Hadoop MR job %s to complete",
this.job.getJobID()));
- this.job.waitForCompletion(true);
- mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId",
this.job.getJobID().toString()));
+ if (this.fsm.getCurrentState().equals(MRJobLauncherState.RUNNING)) {
+ JobInterruptionPredicate jobInterruptionPredicate = new
JobInterruptionPredicate(jobState, this::interruptJob, true);
Review comment:
Can the JobInterruptionPredicate be made part of the FSM definition itself?
I am thinking in general, each state of the FSM could potentially have a
Predicate service of its own. The service is started/stopped as states change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services