Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d9d7d5f0c -> 467fe8fc8


Allow GobblinHelixJobScheduler to disable the services started by it's super 
class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/dc3d1227
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/dc3d1227
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/dc3d1227

Branch: refs/heads/master
Commit: dc3d122769ed6c3afcb8e9af6210f594b5fc7585
Parents: 0975312
Author: Joel Baranick <[email protected]>
Authored: Mon May 1 09:58:37 2017 -0700
Committer: Joel Baranick <[email protected]>
Committed: Fri Jul 28 12:46:09 2017 -0700

----------------------------------------------------------------------
 .../cluster/GobblinHelixJobScheduler.java       |  4 ++
 .../java/gobblin/scheduler/JobScheduler.java    | 48 ++++++++++++--------
 2 files changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java 
b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
index 05595bc..c598c72 100644
--- 
a/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -103,6 +103,10 @@ public class GobblinHelixJobScheduler extends JobScheduler 
{
   }
 
   @Override
+  protected void startServices() throws Exception {
+  }
+
+  @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
       JobLauncher jobLauncher = buildGobblinHelixJobLauncher(jobProps);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/dc3d1227/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java 
b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
index 5eb1a6a..2736637 100644
--- a/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
+++ b/gobblin-runtime/src/main/java/gobblin/scheduler/JobScheduler.java
@@ -17,9 +17,8 @@
 
 package gobblin.scheduler;
 
+import java.io.Closeable;
 import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +49,6 @@ import org.quartz.UnableToInterruptJobException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -131,6 +129,8 @@ public class JobScheduler extends AbstractIdleService {
   // A period of time for scheduler to wait until jobs are finished
   private final boolean waitForJobCompletion;
 
+  private final Closer closer = Closer.create();
+
   public JobScheduler(Properties properties, SchedulerService scheduler)
       throws Exception {
     this.properties = properties;
@@ -155,14 +155,6 @@ public class JobScheduler extends AbstractIdleService {
         
this.properties.getProperty(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY,
             ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION));
 
-    if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) 
&&
-        
!this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
 {
-      String path = FileSystems.getDefault()
-          
.getPath(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY))
-          .normalize().toAbsolutePath().toString();
-      
this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, 
"file:///" + path);
-    }
-
     if 
(this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
 {
       this.jobConfigFileDirPath = new 
Path(this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY));
       this.listener = new 
PathAlterationListenerAdaptorForMonitor(jobConfigFileDirPath, this);
@@ -185,28 +177,34 @@ public class JobScheduler extends AbstractIdleService {
     }
 
     // Note: This should not be mandatory, gobblin-cluster modes have their 
own job configuration managers
-    if 
(this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
 {
-      startGeneralJobConfigFileMonitor();
-      scheduleGeneralConfiguredJobs();
+    if (this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)
+            || 
this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY))
 {
+
+      if 
(this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY) && 
!this.properties.containsKey(
+              ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)) {
+        
this.properties.setProperty(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+                "file://" + 
this.properties.getProperty(ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY));
+      }
+      startServices();
     }
   }
 
+  protected void startServices() throws Exception {
+    startGeneralJobConfigFileMonitor();
+    scheduleGeneralConfiguredJobs();
+  }
+
   @Override
   protected void shutDown()
       throws Exception {
     LOG.info("Stopping the job scheduler");
-
-    if 
(this.properties.containsKey(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY)
 || this.properties.containsKey(
-        ConfigurationKeys.JOB_CONFIG_FILE_DIR_KEY)) {
-      this.pathAlterationDetector.stop(1000);
-    }
+    closer.close();
 
     List<JobExecutionContext> currentExecutions = 
this.scheduler.getScheduler().getCurrentlyExecutingJobs();
     for (JobExecutionContext jobExecutionContext : currentExecutions) {
       
this.scheduler.getScheduler().interrupt(jobExecutionContext.getFireInstanceId());
     }
 
-
     ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG));
   }
 
@@ -454,6 +452,16 @@ public class JobScheduler extends AbstractIdleService {
       throws Exception {
     SchedulerUtils.addPathAlterationObserver(this.pathAlterationDetector, 
this.listener, jobConfigFileDirPath);
     this.pathAlterationDetector.start();
+    this.closer.register(new Closeable() {
+      @Override
+      public void close() throws IOException {
+        try {
+          pathAlterationDetector.stop(1000);
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+      }
+    });
   }
 
   /**

Reply via email to