This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3888558  [GOBBLIN-795] Make JobCatalog optional for 
FsJobConfigurationManager
3888558 is described below

commit 388855876b731ddc6a22231c8961be631230b582
Author: sv2000 <[email protected]>
AuthorDate: Wed Jun 5 20:49:49 2019 -0700

    [GOBBLIN-795] Make JobCatalog optional for FsJobConfigurationManager
    
    Closes #2662 from sv2000/fsConfigManager
---
 .../gobblin/cluster/FsJobConfigurationManager.java | 145 +++++++++++++++++++++
 .../FsScheduledJobConfigurationManager.java        |  84 ------------
 .../gobblin/cluster/GobblinClusterManager.java     |   6 +-
 .../gobblin/cluster/GobblinHelixJobScheduler.java  |   5 +-
 .../gobblin/cluster/ClusterIntegrationTest.java    |  15 +--
 ...est.java => FsJobConfigurationManagerTest.java} |  79 ++++++++++-
 .../suite/IntegrationJobRestartViaSpecSuite.java   |  20 +--
 7 files changed, 231 insertions(+), 123 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
new file mode 100644
index 0000000..5fd0d6a
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsJobConfigurationManager.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.FsSpecConsumer;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecConsumer;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
+
+/**
+ * A {@link JobConfigurationManager} that reads {@link JobSpec}s from a source 
path on a
+ * {@link org.apache.hadoop.fs.FileSystem} and posts them to an {@link 
EventBus}.
+ * The {@link FsJobConfigurationManager} has an underlying {@link 
FsSpecConsumer} that periodically reads the
+ * {@link JobSpec}s from the filesystem and posts an appropriate 
JobConfigArrivalEvent with the job configuration to
+ * the EventBus for consumption by the listeners.
+ */
+@Slf4j
+public class FsJobConfigurationManager extends JobConfigurationManager {
+  private static final long DEFAULT_JOB_SPEC_REFRESH_INTERVAL = 60;
+
+  private final long refreshIntervalInSeconds;
+
+  private final ScheduledExecutorService fetchJobSpecExecutor;
+
+  protected final SpecConsumer _specConsumer;
+
+  private final ClassAliasResolver<SpecConsumer> aliasResolver;
+
+  private final Optional<MutableJobCatalog> _jobCatalogOptional;
+
+  public FsJobConfigurationManager(EventBus eventBus, Config config) {
+    this(eventBus, config, null);
+  }
+
+  public FsJobConfigurationManager(EventBus eventBus, Config config, 
MutableJobCatalog jobCatalog) {
+    super(eventBus, config);
+    this._jobCatalogOptional = jobCatalog != null ? Optional.of(jobCatalog) : 
Optional.absent();
+    this.refreshIntervalInSeconds = ConfigUtils.getLong(config, 
GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL,
+        DEFAULT_JOB_SPEC_REFRESH_INTERVAL);
+
+    this.fetchJobSpecExecutor = Executors.newSingleThreadScheduledExecutor(
+        ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("FetchJobSpecExecutor")));
+
+    this.aliasResolver = new ClassAliasResolver<>(SpecConsumer.class);
+    try {
+      String specConsumerClassName = ConfigUtils.getString(config, 
GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY,
+          GobblinClusterConfigurationKeys.DEFAULT_SPEC_CONSUMER_CLASS);
+      log.info("Using SpecConsumer ClassNameclass name/alias " + 
specConsumerClassName);
+      this._specConsumer = (SpecConsumer) ConstructorUtils
+          
.invokeConstructor(Class.forName(this.aliasResolver.resolve(specConsumerClassName)),
 config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
+        | ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected void startUp() throws Exception{
+    super.startUp();
+    // Schedule the job config fetch task
+    this.fetchJobSpecExecutor.scheduleAtFixedRate(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          fetchJobSpecs();
+        } catch (InterruptedException | ExecutionException e) {
+          log.error("Failed to fetch job specs", e);
+          throw new RuntimeException("Failed to fetch specs", e);
+        }
+      }
+    }, 0, this.refreshIntervalInSeconds, TimeUnit.SECONDS);
+  }
+
+  void fetchJobSpecs() throws ExecutionException, InterruptedException {
+    List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
+        (List<Pair<SpecExecutor.Verb, JobSpec>>) 
this._specConsumer.changedSpecs().get();
+
+    for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
+      JobSpec jobSpec = entry.getValue();
+      SpecExecutor.Verb verb = entry.getKey();
+      if (verb.equals(SpecExecutor.Verb.ADD)) {
+        // Handle addition
+        if (this._jobCatalogOptional.isPresent()) {
+          this._jobCatalogOptional.get().put(jobSpec);
+        }
+        postNewJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
+      } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
+        //Handle update.
+        if (this._jobCatalogOptional.isPresent()) {
+          this._jobCatalogOptional.get().put(jobSpec);
+        }
+        postUpdateJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
+      } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
+        // Handle delete
+        if (this._jobCatalogOptional.isPresent()) {
+          this._jobCatalogOptional.get().remove(jobSpec.getUri());
+        }
+        postDeleteJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
+      }
+
+      try {
+        //Acknowledge the successful consumption of the JobSpec back to the 
SpecConsumer, so that the
+        //SpecConsumer can delete the JobSpec.
+        this._specConsumer.commit(jobSpec);
+      } catch (IOException e) {
+        log.error("Error when committing to FsSpecConsumer: ", e);
+      }
+    }
+  }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
deleted file mode 100644
index e4776ef..0000000
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManager.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.cluster;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.google.common.eventbus.EventBus;
-import com.typesafe.config.Config;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.runtime.api.FsSpecConsumer;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
-
-/**
- * A {@link ScheduledJobConfigurationManager} that reads {@link JobSpec}s from 
a source path on a
- * {@link org.apache.hadoop.fs.FileSystem} and adds them to a {@link 
org.apache.gobblin.runtime.api.JobCatalog}.
- * The {@link FsScheduledJobConfigurationManager} has an underlying {@link 
FsSpecConsumer} that reads the {@link JobSpec}s
- * from the filesystem and once the JobSpecs have been added to the {@link 
org.apache.gobblin.runtime.api.JobCatalog},
- * the consumer deletes the specs from the source path.
- */
-@Slf4j
-public class FsScheduledJobConfigurationManager extends 
ScheduledJobConfigurationManager {
-  private final MutableJobCatalog _jobCatalog;
-
-  public FsScheduledJobConfigurationManager(EventBus eventBus, Config config, 
MutableJobCatalog jobCatalog) {
-    super(eventBus, config);
-    this._jobCatalog = jobCatalog;
-  }
-
-  @Override
-  protected void fetchJobSpecs() throws ExecutionException, 
InterruptedException {
-    List<Pair<SpecExecutor.Verb, JobSpec>> jobSpecs =
-        (List<Pair<SpecExecutor.Verb, JobSpec>>) 
this._specConsumer.changedSpecs().get();
-
-    for (Pair<SpecExecutor.Verb, JobSpec> entry : jobSpecs) {
-      JobSpec jobSpec = entry.getValue();
-      SpecExecutor.Verb verb = entry.getKey();
-      if (verb.equals(SpecExecutor.Verb.ADD)) {
-        // Handle addition
-        this._jobCatalog.put(jobSpec);
-        postNewJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
-      } else if (verb.equals(SpecExecutor.Verb.UPDATE)) {
-        //Handle update.
-        //Overwrite the jobSpec in the jobCatalog and post an 
UpdateJobConfigArrivalEvent.
-        this._jobCatalog.put(jobSpec);
-        postUpdateJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
-      } else if (verb.equals(SpecExecutor.Verb.DELETE)) {
-        // Handle delete
-        this._jobCatalog.remove(jobSpec.getUri());
-        postDeleteJobConfigArrival(jobSpec.getUri().toString(), 
jobSpec.getConfigAsProperties());
-      }
-
-      try {
-        //Acknowledge the successful consumption of the JobSpec back to the 
SpecConsumer, so that the
-        //SpecConsumer can delete the JobSpec.
-        this._specConsumer.commit(jobSpec);
-      } catch (IOException e) {
-        log.error("Error when committing to FsSpecConsumer: ", e);
-      }
-    }
-  }
-}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 571e5ba..e43b241 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -372,11 +372,11 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
 
   private JobConfigurationManager create(Config config) {
     try {
+      List<Object> argumentList = (this.jobCatalog != null)? 
ImmutableList.of(this.eventBus, config, this.jobCatalog) :
+          ImmutableList.of(this.eventBus, config);
       if 
(config.hasPath(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)) 
{
         return (JobConfigurationManager) 
GobblinConstructorUtils.invokeFirstConstructor(Class.forName(
-            
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)),
-            ImmutableList.<Object>of(this.eventBus, config, this.jobCatalog),
-            ImmutableList.<Object>of(this.eventBus, config));
+            
config.getString(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY)),
 argumentList);
       } else {
         return new JobConfigurationManager(this.eventBus, config);
       }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
index 02022a8..3054d2e 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java
@@ -325,9 +325,6 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
       LOGGER.error("Failed to update job " + updateJobArrival.getJobName(), 
je);
     }
 
-    //Wait until the cancelled job is complete.
-    waitForJobCompletion(updateJobArrival.getJobName());
-
     try {
       handleNewJobConfigArrival(new 
NewJobConfigArrivalEvent(updateJobArrival.getJobName(),
           updateJobArrival.getJobConfig()));
@@ -370,6 +367,8 @@ public class GobblinHelixJobScheduler extends JobScheduler 
implements StandardMe
         TaskDriver taskDriver = new TaskDriver(this.jobHelixManager);
         taskDriver.waitToStop(workflowId, this.helixJobStopTimeoutMillis);
         LOGGER.info("Stopped workflow: {}", deleteJobArrival.getJobName());
+        //Wait until the cancelled job is complete.
+        waitForJobCompletion(deleteJobArrival.getJobName());
       } else {
         LOGGER.warn("Could not find Helix Workflow Id for job: {}", 
deleteJobArrival.getJobName());
       }
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 5d8d02b..55a2c0f 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -102,7 +102,7 @@ public class ClusterIntegrationTest {
 
   /**
    * An integration test for restarting a Helix workflow via a JobSpec. This 
test case starts a Helix cluster with
-   * a {@link FsScheduledJobConfigurationManager}. The test case does the 
following:
+   * a {@link FsJobConfigurationManager}. The test case does the following:
    * <ul>
    *   <li> add a {@link org.apache.gobblin.runtime.api.JobSpec} that uses a 
{@link org.apache.gobblin.cluster.SleepingCustomTaskSource})
    *   to {@link IntegrationJobRestartViaSpecSuite#FS_SPEC_CONSUMER_DIR}.  
which is picked by the JobConfigurationManager. </li>
@@ -116,33 +116,30 @@ public class ClusterIntegrationTest {
    *   We confirm the execution by again inspecting the zNode and ensuring its 
TargetState is START. </li>
    * </ul>
    */
-  @Test (dependsOnMethods = { "testJobShouldGetCancelled" })
+  @Test (dependsOnMethods = { "testJobShouldGetCancelled" }, groups = 
{"disabledOnTravis"})
   public void testJobRestartViaSpec() throws Exception {
     this.suite = new IntegrationJobRestartViaSpecSuite();
     HelixManager helixManager = getHelixManager();
 
     IntegrationJobRestartViaSpecSuite restartViaSpecSuite = 
(IntegrationJobRestartViaSpecSuite) this.suite;
 
-    //Add a new JobSpec to the path monitored by the SpecConsumer
-    restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME, 
SpecExecutor.Verb.ADD.name());
-
     //Start the cluster
     restartViaSpecSuite.startCluster();
 
     helixManager.connect();
 
     
AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1).
-        assertTrue(isTaskStarted(helixManager, 
IntegrationJobRestartViaSpecSuite.JOB_ID), "Waiting for the job to start...");
+        assertTrue(isTaskStarted(helixManager, 
IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start...");
 
     
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
-        
assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE), 
"Waiting for the task to enter running state");
+        assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE), 
"Waiting for the task to enter running state");
 
     ZkClient zkClient = new ZkClient(this.zkConnectString);
     PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new 
ZNRecordStreamingSerializer()).build();
     zkClient.setZkSerializer(zkSerializer);
 
     String clusterName = getHelixManager().getClusterName();
-    String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE", 
IntegrationJobRestartViaSpecSuite.JOB_ID).toString();
+    String zNodePath = Paths.get("/", clusterName, "CONFIGS", "RESOURCE", 
IntegrationJobCancelSuite.JOB_ID).toString();
 
     //Ensure that the Workflow is started
     ZNRecord record = zkClient.readData(zNodePath);
@@ -152,7 +149,7 @@ public class ClusterIntegrationTest {
     //Add a JobSpec with UPDATE verb signalling the Helix cluster to restart 
the workflow
     restartViaSpecSuite.addJobSpec(IntegrationJobRestartViaSpecSuite.JOB_NAME, 
SpecExecutor.Verb.UPDATE.name());
 
-    
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(5000).backoffFactor(1).assertTrue(input
 -> {
+    
AssertWithBackoff.create().maxSleepMs(1000).timeoutMs(12000).backoffFactor(1).assertTrue(input
 -> {
       //Inspect the zNode at the path corresponding to the Workflow resource. 
Ensure the target state of the resource is in
       // the STOP state or that the zNode has been deleted.
       ZNRecord recordNew = zkClient.readData(zNodePath, true);
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsJobConfigurationManagerTest.java
similarity index 65%
rename from 
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
rename to 
gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsJobConfigurationManagerTest.java
index 109e058..94f372f 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsScheduledJobConfigurationManagerTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/FsJobConfigurationManagerTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -37,6 +38,9 @@ import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FsSpecConsumer;
 import org.apache.gobblin.runtime.api.FsSpecProducer;
@@ -48,9 +52,9 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
 
 @Slf4j
-public class FsScheduledJobConfigurationManagerTest {
+public class FsJobConfigurationManagerTest {
   private MutableJobCatalog _jobCatalog;
-  private FsScheduledJobConfigurationManager jobConfigurationManager;
+  private FsJobConfigurationManager jobConfigurationManager;
 
   private String jobConfDir = "/tmp/" + this.getClass().getSimpleName() + 
"/jobCatalog";
   private String fsSpecConsumerPathString = "/tmp/fsJobConfigManagerTest";
@@ -59,24 +63,47 @@ public class FsScheduledJobConfigurationManagerTest {
   private FileSystem fs;
   private SpecProducer _specProducer;
 
+  private int newJobConfigArrivalEventCount = 0;
+  private int updateJobConfigArrivalEventCount = 0;
+  private int deleteJobConfigArrivalEventCount = 0;
+
+  // An EventBus used for communications between services running in the 
ApplicationMaster
+  private EventBus eventBus;
+
   @BeforeClass
   public void setUp() throws IOException {
+    this.eventBus = Mockito.mock(EventBus.class);
+    Mockito.doAnswer(invocationOnMock -> {
+      Object argument = invocationOnMock.getArguments()[0];
+
+      if (argument instanceof NewJobConfigArrivalEvent) {
+        newJobConfigArrivalEventCount++;
+      } else if (argument instanceof DeleteJobConfigArrivalEvent) {
+        deleteJobConfigArrivalEventCount++;
+      } else if (argument instanceof UpdateJobConfigArrivalEvent) {
+        updateJobConfigArrivalEventCount++;
+      } else {
+        throw new IOException("Unexpected event type");
+      }
+      return null;
+    }).when(this.eventBus).post(Mockito.anyObject());
+
     this.fs = FileSystem.getLocal(new Configuration(false));
     Path jobConfDirPath = new Path(jobConfDir);
     if (!this.fs.exists(jobConfDirPath)) {
       this.fs.mkdirs(jobConfDirPath);
     }
 
-    EventBus eventBus = new 
EventBus(FsScheduledJobConfigurationManagerTest.class.getSimpleName());
     Config config = ConfigFactory.empty()
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, 
ConfigValueFactory.fromAnyRef(jobConfDir))
         .withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
-        .withValue(FsSpecConsumer.SPEC_PATH_KEY, 
ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString));
+        .withValue(FsSpecConsumer.SPEC_PATH_KEY, 
ConfigValueFactory.fromAnyRef(fsSpecConsumerPathString))
+        .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
 
     this._jobCatalog = new NonObservingFSJobCatalog(config);
     ((NonObservingFSJobCatalog) this._jobCatalog).startAsync().awaitRunning();
 
-    jobConfigurationManager = new FsScheduledJobConfigurationManager(eventBus, 
config, this._jobCatalog);
+    jobConfigurationManager = new FsJobConfigurationManager(eventBus, config, 
this._jobCatalog);
 
     _specProducer = new FsSpecProducer(config);
   }
@@ -110,7 +137,7 @@ public class FsScheduledJobConfigurationManagerTest {
 
   @Test (expectedExceptions = {JobSpecNotFoundException.class})
   public void testFetchJobSpecs() throws ExecutionException, 
InterruptedException, URISyntaxException, JobSpecNotFoundException, IOException 
{
-    //Test adding a JobSpec
+    //Ensure JobSpec is added to JobCatalog
     String verb1 = SpecExecutor.Verb.ADD.name();
     String version1 = "1";
     addJobSpec(jobSpecUriString, version1, verb1);
@@ -119,10 +146,16 @@ public class FsScheduledJobConfigurationManagerTest {
     Assert.assertTrue(jobSpec != null);
     Assert.assertTrue(jobSpec.getVersion().equals(version1));
     Assert.assertTrue(jobSpec.getUri().getPath().equals(jobSpecUriString));
+
     //Ensure the JobSpec is deleted from the FsSpecConsumer path.
     Path fsSpecConsumerPath = new Path(fsSpecConsumerPathString);
     Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
 
+    //Ensure NewJobConfigArrivalEvent is posted to EventBus
+    Assert.assertEquals(newJobConfigArrivalEventCount, 1);
+    Assert.assertEquals(updateJobConfigArrivalEventCount, 0);
+    Assert.assertEquals(deleteJobConfigArrivalEventCount, 0);
+
     //Test that the updated JobSpec has been added to the JobCatalog.
     String verb2 = SpecExecutor.Verb.UPDATE.name();
     String version2 = "2";
@@ -131,15 +164,47 @@ public class FsScheduledJobConfigurationManagerTest {
     jobSpec = this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
     Assert.assertTrue(jobSpec != null);
     Assert.assertTrue(jobSpec.getVersion().equals(version2));
-    //Ensure the JobSpec is deleted from the FsSpecConsumer path.
+
+    //Ensure the updated JobSpec is deleted from the FsSpecConsumer path.
     Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
 
+    //Ensure UpdateJobConfigArrivalEvent is posted to EventBus
+    Assert.assertEquals(newJobConfigArrivalEventCount, 1);
+    Assert.assertEquals(updateJobConfigArrivalEventCount, 1);
+    Assert.assertEquals(deleteJobConfigArrivalEventCount, 0);
+
     //Test that the JobSpec has been deleted from the JobCatalog.
     String verb3 = SpecExecutor.Verb.DELETE.name();
     addJobSpec(jobSpecUriString, version2, verb3);
     this.jobConfigurationManager.fetchJobSpecs();
+
+    //Ensure the JobSpec is deleted from the FsSpecConsumer path.
     Assert.assertEquals(this.fs.listStatus(fsSpecConsumerPath).length, 0);
     this._jobCatalog.getJobSpec(new URI(jobSpecUriString));
+
+    //Ensure DeleteJobConfigArrivalEvent is posted to EventBus
+    Assert.assertEquals(newJobConfigArrivalEventCount, 1);
+    Assert.assertEquals(updateJobConfigArrivalEventCount, 1);
+    Assert.assertEquals(deleteJobConfigArrivalEventCount, 1);
+  }
+
+  @Test
+  public void testException()
+      throws Exception {
+    FsJobConfigurationManager jobConfigurationManager = 
Mockito.spy(this.jobConfigurationManager);
+    Mockito.doThrow(new ExecutionException(new IOException("Test 
exception"))).when(jobConfigurationManager).fetchJobSpecs();
+
+    jobConfigurationManager.startUp();
+
+    //Add wait to ensure that fetchJobSpecExecutor thread is scheduled at 
least once.
+    Thread.sleep(2000);
+    Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
+
+    Thread.sleep(2000);
+    //Verify that there are no new invocations of fetchJobSpecs()
+    Mockito.verify(jobConfigurationManager, Mockito.times(1)).fetchJobSpecs();
+    //Ensure that the JobConfigurationManager Service is not running.
+    Assert.assertFalse(jobConfigurationManager.isRunning());
   }
 
   @AfterClass
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
index 1d2c247..d1f65dd 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java
@@ -24,10 +24,6 @@ import java.io.Reader;
 import java.net.URI;
 import java.net.URISyntaxException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
@@ -37,7 +33,7 @@ import com.typesafe.config.ConfigParseOptions;
 import com.typesafe.config.ConfigSyntax;
 import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.gobblin.cluster.FsScheduledJobConfigurationManager;
+import org.apache.gobblin.cluster.FsJobConfigurationManager;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.SleepingTask;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -49,21 +45,13 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 
 
 public class IntegrationJobRestartViaSpecSuite extends 
IntegrationJobCancelSuite {
-  public static final String JOB_ID = "job_HelloWorldTestJob_1235";
   public static final String JOB_NAME = "HelloWorldTestJob";
-  public static final String JOB_CATALOG_DIR = 
"/tmp/IntegrationJobCancelViaSpecSuite/jobCatalog";
   public static final String FS_SPEC_CONSUMER_DIR = 
"/tmp/IntegrationJobCancelViaSpecSuite/jobSpecs";
-  public static final String TASK_STATE_FILE = 
"/tmp/IntegrationJobCancelViaSpecSuite/taskState/_RUNNING";
 
   private final SpecProducer _specProducer;
 
   public IntegrationJobRestartViaSpecSuite() throws IOException {
     super();
-    Path jobCatalogDirPath = new Path(JOB_CATALOG_DIR);
-    FileSystem fs = FileSystem.getLocal(new Configuration());
-    if (!fs.exists(jobCatalogDirPath)) {
-      fs.mkdirs(jobCatalogDirPath);
-    }
     this._specProducer = new 
FsSpecProducer(ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, 
ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR)));
   }
 
@@ -91,11 +79,9 @@ public class IntegrationJobRestartViaSpecSuite extends 
IntegrationJobCancelSuite
   public Config getManagerConfig() {
     Config managerConfig = super.getManagerConfig();
     managerConfig = 
managerConfig.withValue(GobblinClusterConfigurationKeys.JOB_CONFIGURATION_MANAGER_KEY,
-        
ConfigValueFactory.fromAnyRef(FsScheduledJobConfigurationManager.class.getName()))
-        .withValue(GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + 
ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
-            ConfigValueFactory.fromAnyRef(JOB_CATALOG_DIR))
+        
ConfigValueFactory.fromAnyRef(FsJobConfigurationManager.class.getName()))
     .withValue(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef(FsSpecConsumer.class.getName()))
-        .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, 
ConfigValueFactory.fromAnyRef(5L))
+        .withValue(GobblinClusterConfigurationKeys.JOB_SPEC_REFRESH_INTERVAL, 
ConfigValueFactory.fromAnyRef(1L))
     .withValue(FsSpecConsumer.SPEC_PATH_KEY, 
ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR));
     return managerConfig;
   }

Reply via email to