[
https://issues.apache.org/jira/browse/GOBBLIN-677?focusedWorklogId=198228&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-198228
]
ASF GitHub Bot logged work on GOBBLIN-677:
------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Feb/19 17:39
Start Date: 13/Feb/19 17:39
Worklog Time Spent: 10m
Work Description: sv2000 commented on pull request #2548: [GOBBLIN-677] -
Allow early termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256509276
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
##########
@@ -252,26 +270,38 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
this.taskStateCollectorService.startAsync().awaitRunning();
LOG.info("Launching Hadoop MR job " + this.job.getJobName());
- this.job.submit();
- this.hadoopJobSubmitted = true;
+ try (FiniteStateMachine<MRJobLauncherState>.Transition t =
this.fsm.startTransition(MRJobLauncherState.RUNNING)) {
+ try {
+ this.job.submit();
+ } catch (Throwable exc) {
+ t.changeEndState(MRJobLauncherState.FAILED);
+ throw exc;
+ }
+ this.hadoopJobSubmitted = true;
- // Set job tracking URL to the Hadoop job tracking URL if it is not set
yet
- if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
- jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY,
this.job.getTrackingURL());
+ // Set job tracking URL to the Hadoop job tracking URL if it is not
set yet
+ if (!jobState.contains(ConfigurationKeys.JOB_TRACKING_URL_KEY)) {
+ jobState.setProp(ConfigurationKeys.JOB_TRACKING_URL_KEY,
this.job.getTrackingURL());
+ }
+ } catch (FiniteStateMachine.UnallowedTransitionException unallowed) {
+ LOG.info("Cannot start MR job.", unallowed);
}
- TimingEvent mrJobRunTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.RunJobTimings.MR_JOB_RUN);
- LOG.info(String.format("Waiting for Hadoop MR job %s to complete",
this.job.getJobID()));
- this.job.waitForCompletion(true);
- mrJobRunTimer.stop(ImmutableMap.of("hadoopMRJobId",
this.job.getJobID().toString()));
+ if (this.fsm.getCurrentState().equals(MRJobLauncherState.RUNNING)) {
+ JobInterruptionPredicate jobInterruptionPredicate = new
JobInterruptionPredicate(jobState, this::interruptJob, true);
Review comment:
Can the JobInterruptionPredicate be made part of the FSM definition itself?
I am thinking in general, each state of the FSM could potentially have a
Predicate service of its own. The service is started/stopped as states change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 198228)
Time Spent: 1h 10m (was: 1h)
> Allow for early termination of Gobblin jobs based on a predicate on job
> progress
> --------------------------------------------------------------------------------
>
> Key: GOBBLIN-677
> URL: https://issues.apache.org/jira/browse/GOBBLIN-677
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Issac Buenrostro
> Assignee: Issac Buenrostro
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)