Repository: incubator-gobblin Updated Branches: refs/heads/master d9d7d5f0c -> 467fe8fc8
Allow GobblinHelixJobScheduler to disable the services started by it's super class. Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dc3d1227 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dc3d1227 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dc3d1227 Branch: refs/heads/master Commit: dc3d122769ed6c3afcb8e9af6210f594b5fc7585 Parents: 0975312 Author: Joel Baranick <[email protected]> Authored: Mon May 1 09:58:37 2017 -0700 Committer: Joel Baranick <[email protected]> Committed: Fri Jul 28 12:46:09 2017 -0700 ---------------------------------------------------------------------- .../cluster/GobblinHelixJobScheduler.java | 4 ++ .../java/gobblin/scheduler/JobScheduler.java | 48 ++++++++++++-------- 2 files changed, 32 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java index 05595bc..c598c72 100644 --- a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java @@ -103,6 +103,10 @@ public class GobblinHelixJobScheduler extends JobScheduler { } @Override + protected void startServices() throws Exception { + } + + @Override public void runJob(Properties jobProps, JobListener jobListener) throws JobException { try { JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java index 5eb1a6a..2736637 100644 --- a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java +++ b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java @@ -17,9 +17,8 @@ package gobblin.scheduler; +import java.io.Closeable; import java.io.IOException; -import java.nio.file.FileSystems; -import java.nio.file.Paths; import java.util.Collection; import java.util.List; import java.util.Map; @@ -50,7 +49,6 @@ import org.quartz.UnableToInterruptJobException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.MDC; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -131,6 +129,8 @@ public class JobScheduler extends AbstractIdleService { // A period of time for scheduler to wait until jobs are finished private final boolean waitForJobCompletion; + private final Closer closer = Closer.create(); + public JobScheduler(Properties properties, SchedulerService scheduler) throws Exception { this.properties = properties; @@ -155,14 +155,6 @@ public class JobScheduler extends AbstractIdleService { this.properties.getProperty(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY, ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION)); - if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) && - !this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) { - String path = FileSystems.getDefault() - .getPath(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)) - .normalize().toAbsolutePath().toString(); - this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, "file:///" + path); - } - if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) { this.jobConfigFileDirPath = new Path(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)); this.listener = new PathAlterationListenerAdaptorForMonitor(jobConfigFileDirPath, this); @@ -185,28 +177,34 @@ public class JobScheduler extends AbstractIdleService { } // Note: This should not be mandatory, gobblin-cluster modes have their own job configuration managers - if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) { - startGeneralJobConfigFileMonitor(); - scheduleGeneralConfiguredJobs(); + if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) + || this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) { + + if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) && !this.properties.containsKey( + ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) { + this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + "file://" + this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)); + } + startServices(); } } + protected void startServices() throws Exception { + startGeneralJobConfigFileMonitor(); + scheduleGeneralConfiguredJobs(); + } + @Override protected void shutDown() throws Exception { LOG.info("Stopping the job scheduler"); - - if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY) || this.properties.containsKey( - ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)) { - this.pathAlterationDetector.stop(1000); - } + closer.close(); List<JobExecutionContext> currentExecutions = this.scheduler.getScheduler().getCurrentlyExecutingJobs(); for (JobExecutionContext jobExecutionContext : currentExecutions) { this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId()); } - ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG)); } @@ -454,6 +452,16 @@ public class JobScheduler extends AbstractIdleService { throws Exception { SchedulerUtils.addPathAlterationObserver(this.pathAlterationDetector, this.listener, jobConfigFileDirPath); this.pathAlterationDetector.start(); + this.closer.register(new Closeable() { + @Override + public void close() throws IOException { + try { + pathAlterationDetector.stop(1000); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + }); } /**
