Repository: incubator-gobblin Updated Branches: refs/heads/master 7e63c7b91 -> 56cecb28a
[GOBBLIN-159] Allow Helix jobs to be gracefully canceled [GOBBLIN-159] Allow Helix jobs to be gracefully canceled. Fixed some PR comments Add flag to check if cancellation was requested and cancel only if that flag is set. Closes #2037 from kadaan/Fix_for_GOBBLIN-159 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56cecb28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56cecb28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56cecb28 Branch: refs/heads/master Commit: 56cecb28addb912d4e2668ff2d271c99ca3c4cdc Parents: 7e63c7b Author: Joel Baranick <[email protected]> Authored: Tue Aug 15 13:55:01 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Aug 15 13:55:01 2017 -0700 ---------------------------------------------------------------------- .../apache/gobblin/cluster/GobblinHelixJob.java | 45 ++++++----- .../apache/gobblin/scheduler/JobScheduler.java | 80 +++++++++++++++++++- 2 files changed, 104 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56cecb28/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java index d9eab64..265be3c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java @@ -19,25 +19,26 @@ package org.apache.gobblin.cluster; import java.util.List; import java.util.Properties; +import java.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; +import org.quartz.InterruptableJob; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.BaseGobblinJob; import org.apache.gobblin.scheduler.JobScheduler; +import org.quartz.UnableToInterruptJobException; /** @@ -48,7 +49,9 @@ import org.apache.gobblin.scheduler.JobScheduler; */ @Alpha @Slf4j -public class GobblinHelixJob extends BaseGobblinJob { +public class GobblinHelixJob extends BaseGobblinJob implements InterruptableJob { + private Future cancellable = null; + @Override public void executeImpl(JobExecutionContext context) throws JobExecutionException { JobDataMap dataMap = context.getJobDetail().getJobDataMap(); @@ -65,28 +68,32 @@ public class GobblinHelixJob extends BaseGobblinJob { try { final JobLauncher jobLauncher = new GobblinHelixJobLauncher(jobProps, helixManager, appWorkDir, eventMetadata); - if (Boolean.valueOf(jobProps.getProperty(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD, - Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) { + Boolean.toString(GobblinClusterConfigurationKeys.JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT)))) { jobScheduler.runJob(jobProps, jobListener, jobLauncher); } else { - // if not executing in the scheduling thread then submit a runnable to the job scheduler's ExecutorService - // for asynchronous execution. - Runnable runnable = new Runnable() { - @Override - public void run() { - try { - jobScheduler.runJob(jobProps, jobListener, jobLauncher); - } catch (JobException je) { - log.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); - } - } - }; - - jobScheduler.submitRunnableToExecutor(runnable); + cancellable = jobScheduler.scheduleJobImmediately(jobProps, jobListener, jobLauncher); } } catch (Throwable t) { throw new JobExecutionException(t); } } + + @Override + public void interrupt() throws UnableToInterruptJobException { + if (cancellable != null) { + try { + if (cancellable.cancel(false)) { + return; + } + } catch (Exception e) { + log.error("Failed to gracefully cancel job. Attempting to force cancellation.", e); + } + try { + cancellable.cancel(true); + } catch (Exception e) { + throw new UnableToInterruptJobException(e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56cecb28/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java index 0772d60..2313c13 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java @@ -24,10 +24,14 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.configuration.ConfigurationException; import org.apache.hadoop.fs.Path; @@ -131,6 +135,8 @@ public class JobScheduler extends AbstractIdleService { private final Closer closer = Closer.create(); + private volatile boolean cancelRequested = false; + public JobScheduler(Properties properties, SchedulerService scheduler) throws Exception { this.properties = properties; @@ -199,10 +205,14 @@ public class JobScheduler extends AbstractIdleService { throws Exception { LOG.info("Stopping the job scheduler"); closer.close(); - + cancelRequested = true; List<JobExecutionContext> currentExecutions = this.scheduler.getScheduler().getCurrentlyExecutingJobs(); for (JobExecutionContext jobExecutionContext : currentExecutions) { - this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId()); + try { + this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId()); + } catch (UnableToInterruptJobException e) { + LOG.error("Failed to cancel job " + jobExecutionContext.getJobDetail().getKey(), e); + } } ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG)); @@ -231,6 +241,72 @@ public class JobScheduler extends AbstractIdleService { } /** + * Schedule a job immediately. + * + * <p> + * This method calls the Quartz scheduler to scheduler the job. + * </p> + * + * @param jobProps Job configuration properties + * @param jobListener {@link JobListener} used for callback, + * can be <em>null</em> if no callback is needed. + * @throws JobException when there is anything wrong + * with scheduling the job + */ + public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener, JobLauncher jobLauncher) { + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + runJob(jobProps, jobListener, jobLauncher); + } catch (JobException je) { + LOG.error("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); + } + } + }; + final Future<?> future = this.jobExecutor.submit(runnable); + return new Future() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (!cancelRequested) { + return false; + } + boolean result = true; + try { + jobLauncher.cancelJob(jobListener); + } catch (JobException e) { + LOG.error("Failed to cancel job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + result = false; + } + if (mayInterruptIfRunning) { + result &= future.cancel(true); + } + return result; + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + @Override + public boolean isDone() { + return future.isDone(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + }; + } + + /** * Submit a runnable to the {@link ExecutorService} of this {@link JobScheduler}. * @param runnable the runnable to submit to the job executor */
