This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fa76a8b [GOBBLIN-1390] Add an option to run a subset of jobs from job
config d…
fa76a8b is described below
commit fa76a8b26a7e381fffec64b8bb5ffd27b122f9a0
Author: suvasude <[email protected]>
AuthorDate: Tue Feb 23 11:19:01 2021 -0800
[GOBBLIN-1390] Add an option to run a subset of jobs from job config d…
Closes #3229 from
sv2000/jobConfigManagerFilterJobs
---
.../cluster/GobblinClusterConfigurationKeys.java | 2 ++
.../gobblin/cluster/JobConfigurationManager.java | 30 +++++++++++++++++++++-
.../cluster/JobConfigurationManagerTest.java | 17 ++++++++++++
3 files changed, 48 insertions(+), 1 deletion(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 88585cd..8c9513f 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -112,6 +112,8 @@ public class GobblinClusterConfigurationKeys {
* @see <a
href="https://gobblin.readthedocs.io/en/latest/user-guide/Working-with-Job-Configuration-Files/">Job
Config Files</a>
*/
public static final String JOB_CONF_PATH_KEY = GOBBLIN_CLUSTER_PREFIX +
"job.conf.path";
+ //A java.util.regex specifying the subset of jobs under JOB_CONF_PATH to be
run.
+ public static final String JOBS_TO_RUN = GOBBLIN_CLUSTER_PREFIX +
"jobsToRun";
public static final String INPUT_WORK_UNIT_DIR_NAME = "_workunits";
public static final String OUTPUT_TASK_STATE_DIR_NAME = "_taskstates";
// This is the directory to store job.state files when a state store is used.
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
index 86b5219..afd49ec 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java
@@ -21,13 +21,19 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
import org.apache.gobblin.cluster.event.CancelJobConfigArrivalEvent;
import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Strings;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
@@ -66,6 +72,7 @@ public class JobConfigurationManager extends
AbstractIdleService implements Stan
private static final Logger LOGGER =
LoggerFactory.getLogger(JobConfigurationManager.class);
+ private Optional<Pattern> jobsToRun;
protected final EventBus eventBus;
protected final Config config;
protected Optional<String> jobConfDirPath;
@@ -78,6 +85,13 @@ public class JobConfigurationManager extends
AbstractIdleService implements Stan
this.jobConfDirPath =
config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY) ?
Optional
.of(config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) :
Optional.<String>absent();
+ String jobsToRunRegex = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.JOBS_TO_RUN, "");
+ try {
+ this.jobsToRun = !Strings.isNullOrEmpty(jobsToRunRegex) ?
Optional.of(Pattern.compile(config.getString(GobblinClusterConfigurationKeys.JOBS_TO_RUN)))
: Optional.absent();
+ } catch (PatternSyntaxException e) {
+ LOGGER.error("Invalid regex pattern: {}, Exception: {}", jobsToRunRegex,
e);
+ this.jobsToRun = Optional.absent();
+ }
try {
this.jobSpecResolver = JobSpecResolver.builder(config).build();
} catch (IOException ioe) {
@@ -106,7 +120,11 @@ public class JobConfigurationManager extends
AbstractIdleService implements Stan
List<Properties> jobConfigs =
SchedulerUtils.loadGenericJobConfigs(properties, this.jobSpecResolver);
LOGGER.info("Loaded " + jobConfigs.size() + " job configuration(s)");
for (Properties config : jobConfigs) {
-
postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY),
config);
+ if (!jobsToRun.isPresent() || shouldRun(jobsToRun.get(), config)) {
+
postNewJobConfigArrival(config.getProperty(ConfigurationKeys.JOB_NAME_KEY),
config);
+ } else {
+ LOGGER.warn("Job {} has been filtered and will not be run in the
cluster.", config.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ }
}
} else {
LOGGER.warn("Job configuration directory " + jobConfigDir + " not
found");
@@ -114,6 +132,16 @@ public class JobConfigurationManager extends
AbstractIdleService implements Stan
}
}
+ @VisibleForTesting
+ /**
+ * A helper method to determine if a given job should be submitted to
cluster for execution based on the
+ * regex defining the jobs to run.
+ */
+ protected static boolean shouldRun(Pattern jobsToRun, Properties jobConfig) {
+ Matcher matcher =
jobsToRun.matcher(jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ return matcher.matches();
+ }
+
@Override
protected void shutDown() throws Exception {
// Nothing to do
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
index 9e36d8c..8c9fc65 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.testng.Assert;
@@ -67,6 +68,10 @@ public class JobConfigurationManagerTest {
public void setUp() throws IOException {
this.eventBus.register(this);
+ if (this.jobConfigFileDir.exists()) {
+ FileUtils.deleteDirectory(this.jobConfigFileDir);
+ }
+
// Prepare the test job configuration files
Assert.assertTrue(this.jobConfigFileDir.mkdirs(), "Failed to create " +
this.jobConfigFileDir);
Closer closer = Closer.create();
@@ -108,6 +113,18 @@ public class JobConfigurationManagerTest {
Assert.assertEquals(actual, expected);
}
+ @Test
+ public void testShouldRun() {
+ Pattern pattern = Pattern.compile("testJob1|testJob2");
+ Properties jobConfig = new Properties();
+ jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, "testJob1");
+ Assert.assertTrue(JobConfigurationManager.shouldRun(pattern, jobConfig));
+ jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, "testJob2");
+ Assert.assertTrue(JobConfigurationManager.shouldRun(pattern, jobConfig));
+ jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, "job1");
+ Assert.assertFalse(JobConfigurationManager.shouldRun(pattern, jobConfig));
+ }
+
@AfterClass
public void tearDown() throws IOException {
this.jobConfigurationManager.stopAsync().awaitTerminated();