sv2000 commented on a change in pull request #2548: [GOBBLIN-677] - Allow early
termination of Gobblin jobs based on a predicate on the job progress
URL: https://github.com/apache/incubator-gobblin/pull/2548#discussion_r256505082
##########
File path:
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
##########
@@ -286,17 +316,57 @@ protected void runWorkUnits(List<WorkUnit> workUnits)
throws Exception {
@Override
protected void executeCancellation() {
- try {
- if (this.hadoopJobSubmitted && !this.job.isComplete()) {
- LOG.info("Killing the Hadoop MR job for job " +
this.jobContext.getJobId());
- this.job.killJob();
- // Collect final task states.
- this.taskStateCollectorService.stopAsync().awaitTerminated();
+ try (FiniteStateMachine<MRJobLauncherState>.Transition transition =
this.fsm.startTransition(MRJobLauncherState.CANCELLED)) {
+ if (transition.getStartState() == MRJobLauncherState.RUNNING) {
+ try {
+ LOG.info("Killing the Hadoop MR job for job " +
this.jobContext.getJobId());
+ this.job.killJob();
+ // Collect final task states.
+ this.taskStateCollectorService.stopAsync().awaitTerminated();
+ } catch (IOException ioe) {
+ LOG.error("Failed to kill the Hadoop MR job for job " +
this.jobContext.getJobId());
+ transition.changeEndState(MRJobLauncherState.FAILED);
+ }
+ }
+ } catch (FiniteStateMachine.UnallowedTransitionException |
InterruptedException exc) {
+ LOG.error("Failed to cancel job " + this.jobContext.getJobId(), exc);
+ }
+ }
+
+ /**
+ * Attempt a gracious interruptiong of the running job
Review comment:
Spelling error - interruption instead of interruptiong
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services