This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fa76a8b  [GOBBLIN-1390] Add an option to run a subset of jobs from job 
config d…
fa76a8b is described below

commit fa76a8b26a7e381fffec64b8bb5ffd27b122f9a0
Author: suvasude <[email protected]>
AuthorDate: Tue Feb 23 11:19:01 2021 -0800

    [GOBBLIN-1390] Add an option to run a subset of jobs from job config d…
    
    Closes #3229 from
    sv2000/jobConfigManagerFilterJobs
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  2 ++
 .../gobblin/cluster/JobConfigurationManager.java   | 30 +++++++++++++++++++++-
 .../cluster/JobConfigurationManagerTest.java       | 17 ++++++++++++
 3 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 88585cd..8c9513f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -112,6 +112,8 @@ public class GobblinClusterConfigurationKeys {
    * @see <a 
href="https://gobblin.readthedocs.io/en/latest/user-guide/Working-with-Job-Configuration-Files/";>Job
 Config Files</a>
    */
   public static final String JOB_CONF_PATH_KEY = GOBBLIN_CLUSTER_PREFIX + 
"job.conf.path";
+  //A java.util.regex specifying the subset of jobs under JOB_CONF_PATH to be 
run.
+  public static final String JOBS_TO_RUN = GOBBLIN_CLUSTER_PREFIX + 
"jobsToRun";
   public static final String INPUT_WORK_UNIT_DIR_NAME = "_workunits";
   public static final String OUTPUT_TASK_STATE_DIR_NAME = "_taskstates";
   // This is the directory to store job.state files when a state store is used.
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index 86b5219..afd49ec 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -21,13 +21,19 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 
 import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
 import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
+import com.google.common.base.Strings;
 import com.google.common.eventbus.EventBus;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
@@ -66,6 +72,7 @@ public class JobConfigurationManager extends 
AbstractIdleService implements Stan
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(JobConfigurationManager.class);
 
+  private Optional<Pattern> jobsToRun;
   protected final EventBus eventBus;
   protected final Config config;
   protected Optional<String> jobConfDirPath;
@@ -78,6 +85,13 @@ public class JobConfigurationManager extends 
AbstractIdleService implements Stan
     this.jobConfDirPath =
         config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY) ? 
Optional
             
.of(config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) : 
Optional.<String>absent();
+    String jobsToRunRegex = ConfigUtils.getString(config, 
GobblinClusterConfigurationKeys.JOBS_TO_RUN, "");
+    try {
+      this.jobsToRun = !Strings.isNullOrEmpty(jobsToRunRegex) ? 
Optional.of(Pattern.compile(config.getString(GobblinClusterConfigurationKeys.JOBS_TO_RUN)))
 : Optional.absent();
+    } catch (PatternSyntaxException e) {
+      LOGGER.error("Invalid regex pattern: {}, Exception: {}", jobsToRunRegex, 
e);
+      this.jobsToRun = Optional.absent();
+    }
     try {
       this.jobSpecResolver = JobSpecResolver.builder(config).build();
     } catch (IOException ioe) {
@@ -106,7 +120,11 @@ public class JobConfigurationManager extends 
AbstractIdleService implements Stan
         List<Properties> jobConfigs = 
SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
         LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
         for (Properties config : jobConfigs) {
-          
postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), 
config);
+          if (!jobsToRun.isPresent() || shouldRun(jobsToRun.get(), config)) {
+            
postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY), 
config);
+          } else {
+            LOGGER.warn("Job {} has been filtered and will not be run in the 
cluster.", config.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+          }
         }
       } else {
         LOGGER.warn("Job configuration directory " + jobConfigDir + " not 
found");
@@ -114,6 +132,16 @@ public class JobConfigurationManager extends 
AbstractIdleService implements Stan
     }
   }
 
+  @VisibleForTesting
+  /**
+   * A helper method to determine if a given job should be submitted to 
cluster for execution based on the
+   * regex defining the jobs to run.
+   */
+  protected static boolean shouldRun(Pattern jobsToRun, Properties jobConfig) {
+    Matcher matcher = 
jobsToRun.matcher(jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+    return matcher.matches();
+  }
+
   @Override
   protected void shutDown() throws Exception {
     // Nothing to do
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
index 9e36d8c..8c9fc65 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
 import org.testng.Assert;
@@ -67,6 +68,10 @@ public class JobConfigurationManagerTest {
   public void setUp() throws IOException {
     this.eventBus.register(this);
 
+    if (this.jobConfigFileDir.exists()) {
+      FileUtils.deleteDirectory(this.jobConfigFileDir);
+    }
+
     // Prepare the test job configuration files
     Assert.assertTrue(this.jobConfigFileDir.mkdirs(), "Failed to create " + 
this.jobConfigFileDir);
     Closer closer = Closer.create();
@@ -108,6 +113,18 @@ public class JobConfigurationManagerTest {
     Assert.assertEquals(actual, expected);
   }
 
+  @Test
+  public void testShouldRun() {
+    Pattern pattern = Pattern.compile("testJob1|testJob2");
+    Properties jobConfig = new Properties();
+    jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, "testJob1");
+    Assert.assertTrue(JobConfigurationManager.shouldRun(pattern, jobConfig));
+    jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, "testJob2");
+    Assert.assertTrue(JobConfigurationManager.shouldRun(pattern, jobConfig));
+    jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+    Assert.assertFalse(JobConfigurationManager.shouldRun(pattern, jobConfig));
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     this.jobConfigurationManager.stopAsync().awaitTerminated();

Reply via email to