This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3888558 [GOBBLIN-795] Make JobCatalog optional for
FsJobConfigurationManager
3888558 is described below
commit 388855876b731ddc6a22231c8961be631230b582
Author: sv2000 <[email protected]>
AuthorDate: Wed Jun 5 20:49:49 2019 -0700
[GOBBLIN-795] Make JobCatalog optional for FsJobConfigurationManager
Closes #2662 from sv2000/fsConfigManager
---
.../gobblin/cluster/FsJobConfigurationManager.java | 145 +++++++++++++++++++++
.../FsScheduledJobConfigurationManager.java | 84 ------------
.../gobblin/cluster/GobblinClusterManager.java | 6 +-
.../gobblin/cluster/GobblinHelixJobScheduler.java | 5 +-
.../gobblin/cluster/ClusterIntegrationTest.java | 15 +--
...est.java => FsJobConfigurationManagerTest.java} | 79 ++++++++++-
.../suite/IntegrationJobRestartViaSpecSuite.java | 20 +--
7 files changed, 231 insertions(+), 123 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
new file mode 100644
index 0000000..5fd0d6a
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * A {@link JobConfigurationManager} that reads {@link JobSpec}s from a source
path on a
+ * {@link org.apache.hadoop.fs.FileSystem} and posts them to an {@link
EventBus}.
+ * The {@link FsJobConfigurationManager} has an underlying {@link
FsSpecConsumer} that periodically reads the
+ * {@link JobSpec}s from the filesystem and posts an appropriate
JobConfigArrivalEvent with the job configuration to
+ * the EventBus for consumption by the listeners.
+ */
+@Slf4j
+public class FsJobConfigurationManager extends JobConfigurationManager {
+ private static final long DEFAULT_JOB_SPEC_REFRESH_INTERVAL = 60;
+
+ private final long refreshIntervalInSeconds;
+
+ private final ScheduledExecutorService fetchJobSpecExecutor;
+
+ protected final SpecConsumer _specConsumer;
+
+ private final ClassAliasResolver<SpecConsumer> aliasResolver;
+
+ private final Optional<MutableJobCatalog> _jobCatalogOptional;
+
+ public FsJobConfigurationManager(EventBus eventBus, Config config) {
+ this(eventBus, config, null);
+ }
+
+ public FsJobConfigurationManager(EventBus eventBus, Config config,
MutableJobCatalog jobCatalog) {
+ super(eventBus, config);
+ this._jobCatalogOptional = jobCatalog != null ? Optional.of(jobCatalog) :
Optional.absent();
+ this.refreshIntervalInSeconds = ConfigUtils.getLong(config,
GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL,
+ DEFAULT_JOB_SPEC_REFRESH_INTERVAL);
+
+ this.fetchJobSpecExecutor = Executors.newSingleThreadScheduledExecutor(
+ ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("FetchJobSpecExecutor")));
+
+ this.aliasResolver = new ClassAliasResolver<>(SpecConsumer.class);
+ try {
+ String specConsumerClassName = ConfigUtils.getString(config,
GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY,
+ GobblinClusterConfigurationKeys.DEFAULT_SPEC_CONSUMER_CLASS);
+ log.info("Using SpecConsumer ClassNameclass name/alias " +
specConsumerClassName);
+ this._specConsumer = (SpecConsumer) ConstructorUtils
+
.invokeConstructor(Class.forName(this.aliasResolver.resolve(specConsumerClassName)),
config);
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
+ | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void startUp() throws Exception{
+ super.startUp();
+ // Schedule the job config fetch task
+ this.fetchJobSpecExecutor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fetchJobSpecs();
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Failed to fetch job specs", e);
+ throw new RuntimeException("Failed to fetch specs", e);
+ }
+ }
+ }, 0, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
+ }
+
+ void fetchJobSpecs() throws ExecutionException, InterruptedException {
+ List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
+ (List<Pair<SpecExecutor.Verb, JobSpec>>)
this._specConsumer.changedSpecs().get();
+
+ for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
+ JobSpec jobSpec = entry.getValue();
+ SpecExecutor.Verb verb = entry.getKey();
+ if (verb.equals(SpecExecutor.Verb.ADD)) {
+ // Handle addition
+ if (this._jobCatalogOptional.isPresent()) {
+ this._jobCatalogOptional.get().put(jobSpec);
+ }
+ postNewJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
+ } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
+ //Handle update.
+ if (this._jobCatalogOptional.isPresent()) {
+ this._jobCatalogOptional.get().put(jobSpec);
+ }
+ postUpdateJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
+ } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
+ // Handle delete
+ if (this._jobCatalogOptional.isPresent()) {
+ this._jobCatalogOptional.get().remove(jobSpec.getUri());
+ }
+ postDeleteJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
+ }
+
+ try {
+ //Acknowledge the successful consumption of the JobSpec back to the
SpecConsumer, so that the
+ //SpecConsumer can delete the JobSpec.
+ this._specConsumer.commit(jobSpec);
+ } catch (IOException e) {
+ log.error("Error when committing to FsSpecConsumer: ", e);
+ }
+ }
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
deleted file mode 100644
index e4776ef..0000000
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.cluster;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.google.common.eventbus.EventBus;
-import com.typesafe.config.Config;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.runtime.api.FsSpecConsumer;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
-
-/**
- * A {@link ScheduledJobConfigurationManager} that reads {@link JobSpec}s from
a source path on a
- * {@link org.apache.hadoop.fs.FileSystem} and adds them to a {@link
org.apache.gobblin.runtime.api.JobCatalog}.
- * The {@link FsScheduledJobConfigurationManager} has an underlying {@link
FsSpecConsumer} that reads the {@link JobSpec}s
- * from the filesystem and once the JobSpecs have been added to the {@link
org.apache.gobblin.runtime.api.JobCatalog},
- * the consumer deletes the specs from the source path.
- */
-@Slf4j
-public class FsScheduledJobConfigurationManager extends
ScheduledJobConfigurationManager {
- private final MutableJobCatalog _jobCatalog;
-
- public FsScheduledJobConfigurationManager(EventBus eventBus, Config config,
MutableJobCatalog jobCatalog) {
- super(eventBus, config);
- this._jobCatalog = jobCatalog;
- }
-
- @Override
- protected void fetchJobSpecs() throws ExecutionException,
InterruptedException {
- List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
- (List<Pair<SpecExecutor.Verb, JobSpec>>)
this._specConsumer.changedSpecs().get();
-
- for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
- JobSpec jobSpec = entry.getValue();
- SpecExecutor.Verb verb = entry.getKey();
- if (verb.equals(SpecExecutor.Verb.ADD)) {
- // Handle addition
- this._jobCatalog.put(jobSpec);
- postNewJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
- } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
- //Handle update.
- //Overwrite the jobSpec in the jobCatalog and post an
UpdateJobConfigArrivalEvent.
- this._jobCatalog.put(jobSpec);
- postUpdateJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
- } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
- // Handle delete
- this._jobCatalog.remove(jobSpec.getUri());
- postDeleteJobConfigArrival(jobSpec.getUri().toString(),
jobSpec.getConfigAsProperties());
- }
-
- try {
- //Acknowledge the successful consumption of the JobSpec back to the
SpecConsumer, so that the
- //SpecConsumer can delete the JobSpec.
- this._specConsumer.commit(jobSpec);
- } catch (IOException e) {
- log.error("Error when committing to FsSpecConsumer: ", e);
- }
- }
- }
-}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 571e5ba..e43b241 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -372,11 +372,11 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
private JobConfigurationManager create(Config config) {
try {
+ List<Object> argumentList = (this.jobCatalog != null)?
ImmutableList.of(this.eventBus, config, this.jobCatalog) :
+ ImmutableList.of(this.eventBus, config);
if
(config.hasPath(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY))
{
return (JobConfigurationManager)
GobblinConstructorUtils.invokeFirstConstructor(Class.forName(
-
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)),
- ImmutableList.<Object>of(this.eventBus, config, this.jobCatalog),
- ImmutableList.<Object>of(this.eventBus, config));
+
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)),
argumentList);
} else {
return new JobConfigurationManager(this.eventBus, config);
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 02022a8..3054d2e 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -325,9 +325,6 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
LOGGER.error("Failed to update job " + updateJobArrival.getJobName(),
je);
}
- //Wait until the cancelled job is complete.
- waitForJobCompletion(updateJobArrival.getJobName());
-
try {
handleNewJobConfigArrival(new
NewJobConfigArrivalEvent(updateJobArrival.getJobName(),
updateJobArrival.getJobConfig()));
@@ -370,6 +367,8 @@ public class GobblinHelixJobScheduler extends JobScheduler
implements StandardMe
TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+ //Wait until the cancelled job is complete.
+ waitForJobCompletion(deleteJobArrival.getJobName());
} else {
LOGGER.warn("Could not find Helix Workflow Id for job: {}",
deleteJobArrival.getJobName());
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 5d8d02b..55a2c0f 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -102,7 +102,7 @@ public class ClusterIntegrationTest {
/**
* An integration test for restarting a Helix workflow via a JobSpec. This
test case starts a Helix cluster with
- * a {@link FsScheduledJobConfigurationManager}. The test case does the
following:
+ * a {@link FsJobConfigurationManager}. The test case does the following:
* <ul>
* <li> add a {@link org.apache.gobblin.runtime.api.JobSpec} that uses a
{@link org.apache.gobblin.cluster.SleepingCustomTaskSource})
* to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}.
which is picked by the JobConfigurationManager. </li>
@@ -116,33 +116,30 @@ public class ClusterIntegrationTest {
* We confirm the execution by again inspecting the zNode and ensuring its
TargetState is START. </li>
* </ul>
*/
- @Test (dependsOnMethods = { "testJobShouldGetCancelled" })
+ @Test (dependsOnMethods = { "testJobShouldGetCancelled" }, groups =
{"disabledOnTravis"})
public void testJobRestartViaSpec() throws Exception {
this.suite = new IntegrationJobRestartViaSpecSuite();
HelixManager helixManager = getHelixManager();
IntegrationJobRestartViaSpecSuite restartViaSpecSuite =
(IntegrationJobRestartViaSpecSuite) this.suite;
- //Add a new JobSpec to the path monitored by the SpecConsumer
- restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME,
SpecExecutor.Verb.ADD.name());
-
//Start the cluster
restartViaSpecSuite.startCluster();
helixManager.connect();
AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1).
- assertTrue(isTaskStarted(helixManager,
IntegrationJobRestartViaSpecSuite.JOB_ID), "Waiting for the job to start...");
+ assertTrue(isTaskStarted(helixManager,
IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
-
assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE),
"Waiting for the task to enter running state");
+ assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE),
"Waiting for the task to enter running state");
ZkClient zkClient = new ZkClient(this.zkConnectString);
PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new
ZNRecordStreamingSerializer()).build();
zkClient.setZkSerializer(zkSerializer);
String clusterName = getHelixManager().getClusterName();
- String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE",
IntegrationJobRestartViaSpecSuite.JOB_ID).toString();
+ String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE",
IntegrationJobCancelSuite.JOB_ID).toString();
//Ensure that the Workflow is started
ZNRecord record = zkClient.readData(zNodePath);
@@ -152,7 +149,7 @@ public class ClusterIntegrationTest {
//Add a JobSpec with UPDATE verb signalling the Helix cluster to restart
the workflow
restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME,
SpecExecutor.Verb.UPDATE.name());
-
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(5000).backoffFactor(1).assertTrue(input
-> {
+
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(12000).backoffFactor(1).assertTrue(input
-> {
//Inspect the zNode at the path corresponding to the Workflow resource.
Ensure the target state of the resource is in
// the STOP state or that the zNode has been deleted.
ZNRecord recordNew = zkClient.readData(zNodePath, true);
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsJobConfigurationManagerTest.java
similarity index 65%
rename from
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
rename to
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsJobConfigurationManagerTest.java
index 109e058..94f372f 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsJobConfigurationManagerTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -37,6 +38,9 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FsSpecConsumer;
import org.apache.gobblin.runtime.api.FsSpecProducer;
@@ -48,9 +52,9 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
@Slf4j
-public class FsScheduledJobConfigurationManagerTest {
+public class FsJobConfigurationManagerTest {
private MutableJobCatalog _jobCatalog;
- private FsScheduledJobConfigurationManager jobConfigurationManager;
+ private FsJobConfigurationManager jobConfigurationManager;
private String jobConfDir = "/tmp/" + this.getClass().getSimpleName() +
"/jobCatalog";
private String fsSpecConsumerPathString = "/tmp/fsJobConfigManagerTest";
@@ -59,24 +63,47 @@ public class FsScheduledJobConfigurationManagerTest {
private FileSystem fs;
private SpecProducer _specProducer;
+ private int newJobConfigArrivalEventCount = 0;
+ private int updateJobConfigArrivalEventCount = 0;
+ private int deleteJobConfigArrivalEventCount = 0;
+
+ // An EventBus used for communications between services running in the
ApplicationMaster
+ private EventBus eventBus;
+
@BeforeClass
public void setUp() throws IOException {
+ this.eventBus = Mockito.mock(EventBus.class);
+ Mockito.doAnswer(invocationOnMock -> {
+ Object argument = invocationOnMock.getArguments()[0];
+
+ if (argument instanceof NewJobConfigArrivalEvent) {
+ newJobConfigArrivalEventCount++;
+ } else if (argument instanceof DeleteJobConfigArrivalEvent) {
+ deleteJobConfigArrivalEventCount++;
+ } else if (argument instanceof UpdateJobConfigArrivalEvent) {
+ updateJobConfigArrivalEventCount++;
+ } else {
+ throw new IOException("Unexpected event type");
+ }
+ return null;
+ }).when(this.eventBus).post(Mockito.anyObject());
+
this.fs = FileSystem.getLocal(new Configuration(false));
Path jobConfDirPath = new Path(jobConfDir);
if (!this.fs.exists(jobConfDirPath)) {
this.fs.mkdirs(jobConfDirPath);
}
- EventBus eventBus = new
EventBus(FsScheduledJobConfigurationManagerTest.class.getSimpleName());
Config config = ConfigFactory.empty()
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
ConfigValueFactory.fromAnyRef(jobConfDir))
.withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY,
ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
- .withValue(FsSpecConsumer.SPEC_PATH_KEY,
ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString));
+ .withValue(FsSpecConsumer.SPEC_PATH_KEY,
ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString))
+ .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL,
ConfigValueFactory.fromAnyRef(1));
this._jobCatalog = new NonObservingFSJobCatalog(config);
((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
- jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus,
config, this._jobCatalog);
+ jobConfigurationManager = new FsJobConfigurationManager(eventBus, config,
this._jobCatalog);
_specProducer = new FsSpecProducer(config);
}
@@ -110,7 +137,7 @@ public class FsScheduledJobConfigurationManagerTest {
@Test (expectedExceptions = {JobSpecNotFoundException.class})
public void testFetchJobSpecs() throws ExecutionException,
InterruptedException, URISyntaxException, JobSpecNotFoundException, IOException
{
- //Test adding a JobSpec
+ //Ensure JobSpec is added to JobCatalog
String verb1 = SpecExecutor.Verb.ADD.name();
String version1 = "1";
addJobSpec(jobSpecUriString, version1, verb1);
@@ -119,10 +146,16 @@ public class FsScheduledJobConfigurationManagerTest {
Assert.assertTrue(jobSpec != null);
Assert.assertTrue(jobSpec.getVersion().equals(version1));
Assert.assertTrue(jobSpec.getUri().getPath().equals(jobSpecUriString));
+
//Ensure the JobSpec is deleted from the FsSpecConsumer path.
Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+ //Ensure NewJobConfigArrivalEvent is posted to EventBus
+ Assert.assertEquals(newJobConfigArrivalEventCount, 1);
+ Assert.assertEquals(updateJobConfigArrivalEventCount, 0);
+ Assert.assertEquals(deleteJobConfigArrivalEventCount, 0);
+
//Test that the updated JobSpec has been added to the JobCatalog.
String verb2 = SpecExecutor.Verb.UPDATE.name();
String version2 = "2";
@@ -131,15 +164,47 @@ public class FsScheduledJobConfigurationManagerTest {
jobSpec = this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
Assert.assertTrue(jobSpec != null);
Assert.assertTrue(jobSpec.getVersion().equals(version2));
- //Ensure the JobSpec is deleted from the FsSpecConsumer path.
+
+ //Ensure the updated JobSpec is deleted from the FsSpecConsumer path.
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
+ //Ensure UpdateJobConfigArrivalEvent is posted to EventBus
+ Assert.assertEquals(newJobConfigArrivalEventCount, 1);
+ Assert.assertEquals(updateJobConfigArrivalEventCount, 1);
+ Assert.assertEquals(deleteJobConfigArrivalEventCount, 0);
+
//Test that the JobSpec has been deleted from the JobCatalog.
String verb3 = SpecExecutor.Verb.DELETE.name();
addJobSpec(jobSpecUriString, version2, verb3);
this.jobConfigurationManager.fetchJobSpecs();
+
+ //Ensure the JobSpec is deleted from the FsSpecConsumer path.
Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+
+ //Ensure DeleteJobConfigArrivalEvent is posted to EventBus
+ Assert.assertEquals(newJobConfigArrivalEventCount, 1);
+ Assert.assertEquals(updateJobConfigArrivalEventCount, 1);
+ Assert.assertEquals(deleteJobConfigArrivalEventCount, 1);
+ }
+
+ @Test
+ public void testException()
+ throws Exception {
+ FsJobConfigurationManager jobConfigurationManager =
Mockito.spy(this.jobConfigurationManager);
+ Mockito.doThrow(new ExecutionException(new IOException("Test
exception"))).when(jobConfigurationManager).fetchJobSpecs();
+
+ jobConfigurationManager.startUp();
+
+ //Add wait to ensure that fetchJobSpecExecutor thread is scheduled at
least once.
+ Thread.sleep(2000);
+ Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
+
+ Thread.sleep(2000);
+ //Verify that there are no new invocations of fetchJobSpecs()
+ Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
+ //Ensure that the JobConfigurationManager Service is not running.
+ Assert.assertFalse(jobConfigurationManager.isRunning());
}
@AfterClass
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index 1d2c247..d1f65dd 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -24,10 +24,6 @@ import java.io.Reader;
import java.net.URI;
import java.net.URISyntaxException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.common.io.Resources;
@@ -37,7 +33,7 @@ import com.typesafe.config.ConfigParseOptions;
import com.typesafe.config.ConfigSyntax;
import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.cluster.FsScheduledJobConfigurationManager;
+import org.apache.gobblin.cluster.FsJobConfigurationManager;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.SleepingTask;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -49,21 +45,13 @@ import org.apache.gobblin.runtime.api.SpecProducer;
public class IntegrationJobRestartViaSpecSuite extends
IntegrationJobCancelSuite {
- public static final String JOB_ID = "job_HelloWorldTestJob_1235";
public static final String JOB_NAME = "HelloWorldTestJob";
- public static final String JOB_CATALOG_DIR =
"/tmp/IntegrationJobCancelViaSpecSuite/jobCatalog";
public static final String FS_SPEC_CONSUMER_DIR =
"/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
- public static final String TASK_STATE_FILE =
"/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
private final SpecProducer _specProducer;
public IntegrationJobRestartViaSpecSuite() throws IOException {
super();
- Path jobCatalogDirPath = new Path(JOB_CATALOG_DIR);
- FileSystem fs = FileSystem.getLocal(new Configuration());
- if (!fs.exists(jobCatalogDirPath)) {
- fs.mkdirs(jobCatalogDirPath);
- }
this._specProducer = new
FsSpecProducer(ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY,
ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
}
@@ -91,11 +79,9 @@ public class IntegrationJobRestartViaSpecSuite extends
IntegrationJobCancelSuite
public Config getManagerConfig() {
Config managerConfig = super.getManagerConfig();
managerConfig =
managerConfig.withValue(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY,
-
ConfigValueFactory.fromAnyRef(FsScheduledJobConfigurationManager.class.getName()))
- .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX +
ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
- ConfigValueFactory.fromAnyRef(JOB_CATALOG_DIR))
+
ConfigValueFactory.fromAnyRef(FsJobConfigurationManager.class.getName()))
.withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY,
ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
- .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL,
ConfigValueFactory.fromAnyRef(5L))
+ .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL,
ConfigValueFactory.fromAnyRef(1L))
.withValue(FsSpecConsumer.SPEC_PATH_KEY,
ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR));
return managerConfig;
}