This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3153d61  [GOBBLIN-709] Provide an option to disallow concurrent flow 
execution…
3153d61 is described below

commit 3153d61c51fd827e3f3ef31029c6bb7b3bfc3369
Author: suvasude <[email protected]>
AuthorDate: Thu Mar 28 13:53:57 2019 -0700

    [GOBBLIN-709] Provide an option to disallow concurrent flow execution…
    
    Closes #2580 from sv2000/concurrentFlows
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../service/monitoring/FlowStatusGenerator.java    | 38 ++++++++++++
 .../monitoring/FlowStatusGeneratorTest.java        | 70 ++++++++++++++++++++++
 .../modules/core/GobblinServiceManager.java        | 21 ++++++-
 .../modules/orchestration/Orchestrator.java        | 50 ++++++++++++++++
 .../service/modules/core/GobblinServiceHATest.java |  5 ++
 .../modules/core/GobblinServiceManagerTest.java    |  4 ++
 7 files changed, 188 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8c920a3..00e71eb 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -135,6 +135,7 @@ public class ConfigurationKeys {
   public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
   public static final String FLOW_APPLY_RETENTION = "flow.applyRetention";
   public static final String FLOW_APPLY_INPUT_RETENTION = 
"flow.applyInputRetention";
+  public static final String FLOW_ALLOW_CONCURRENT_EXECUTION = 
"flow.allowConcurrentExecution";
   public static final String FLOW_EXPLAIN_KEY = "flow.explain";
 
   /**
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 72164e3..f9e1005 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -21,6 +21,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Lists;
+
 import lombok.Builder;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -32,6 +34,8 @@ import org.apache.gobblin.annotation.Alpha;
 @Alpha
 @Builder
 public class FlowStatusGenerator {
+  public static final List<String> FINISHED_JOB_STATUSES = 
Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
+
   private final JobStatusRetriever jobStatusRetriever;
 
   /**
@@ -72,4 +76,38 @@ public class FlowStatusGenerator {
 
     return flowStatus;
   }
+
+  /**
+   * Return true if another instance of a flow is running. A flow is 
determined to be in the RUNNING state, if any of the
+   * jobs in the flow are in the RUNNING state.
+   * @param flowName
+   * @param flowGroup
+   * @return true, if any jobs of the flow are RUNNING.
+   */
+  public boolean isFlowRunning(String flowName, String flowGroup) {
+    List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 
1);
+    if (flowStatusList == null || flowStatusList.isEmpty()) {
+      return false;
+    } else {
+      FlowStatus flowStatus = flowStatusList.get(0);
+      Iterator<JobStatus> jobStatusIterator = 
flowStatus.getJobStatusIterator();
+
+      while (jobStatusIterator.hasNext()) {
+        JobStatus jobStatus = jobStatusIterator.next();
+        if (isJobRunning(jobStatus)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  /**
+   * @param jobStatus
+   * @return true if the job associated with the {@link JobStatus} is RUNNING
+   */
+  private boolean isJobRunning(JobStatus jobStatus) {
+    String status = jobStatus.getEventName().toUpperCase();
+    return !FINISHED_JOB_STATUSES.contains(status);
+  }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
new file mode 100644
index 0000000..e4bc4bd
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.monitoring;
+
+import java.util.Iterator;
+
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+
+public class FlowStatusGeneratorTest {
+
+  @Test
+  public void testIsFlowRunning() {
+    JobStatusRetriever jobStatusRetriever = 
Mockito.mock(JobStatusRetriever.class);
+    String flowName = "testName";
+    String flowGroup = "testGroup";
+    Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, 
flowGroup, 1)).thenReturn(null);
+
+    FlowStatusGenerator flowStatusGenerator = 
FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+
+    //If a flow is COMPILED, isFlowRunning() should return true.
+    Long flowExecutionId = 1234L;
+    Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, 
flowGroup, 1)).thenReturn(
+        Lists.newArrayList(flowExecutionId));
+    JobStatus jobStatus = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        .jobName(JobStatusRetriever.NA_KEY).eventName("COMPILED").build();
+    Iterator<JobStatus> jobStatusIterator = 
Lists.newArrayList(jobStatus).iterator();
+    Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+
+    //A flow with 3 JobStatus-es with status COMPLETE, FAILED and CANCELLED => 
isFlowRunning() should return false.
+    String job1 = "job1";
+    String job2 = "job2";
+    String job3 = "job3";
+    JobStatus jobStatus1 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        .jobName(job1).eventName("COMPLETE").build();
+    JobStatus jobStatus2 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        .jobName(job2).eventName("FAILED").build();
+    JobStatus jobStatus3 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        .jobName(job3).eventName("CANCELLED").build();
+    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, 
jobStatus3).iterator();
+    Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+
+    jobStatus2 = 
JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
+        .jobName(job2).eventName("RUNNING").build();
+    jobStatusIterator = Lists.newArrayList(jobStatus1, jobStatus2, 
jobStatus3).iterator();
+    Mockito.when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, 
flowGroup, flowExecutionId)).thenReturn(jobStatusIterator);
+    Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a1f897d..f0419ea 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -93,14 +93,19 @@ import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourc
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 @Alpha
 public class GobblinServiceManager implements ApplicationLauncher, 
StandardMetricsBridge {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinServiceManager.class);
+  private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = 
"jobStatusRetriever.class";
 
   protected final ServiceBasedAppLauncher serviceLauncher;
   private volatile boolean stopInProgress = false;
@@ -132,6 +137,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler;
 
   protected boolean flowCatalogLocalCommit;
+  @Getter
   protected Orchestrator orchestrator;
   protected EmbeddedRestliServer restliServer;
   protected TopologySpecFactory topologySpecFactory;
@@ -338,6 +344,19 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     return new Path(fs.getHomeDirectory(), serviceName + Path.SEPARATOR + 
serviceId);
   }
 
+  private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
+    JobStatusRetriever jobStatusRetriever;
+    try {
+      Class jobStatusRetrieverClass = 
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, 
FsJobStatusRetriever.class.getName()));
+      jobStatusRetriever =
+          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, 
config);
+    } catch (ReflectiveOperationException e) {
+      LOGGER.error("Exception encountered when instantiating 
JobStatusRetriever");
+      throw new RuntimeException(e);
+    }
+    return 
FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
+  }
+
   /**
    * Handle leadership change.
    * @param changeContext notification context
@@ -558,7 +577,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
       try (GobblinServiceManager gobblinServiceManager = new 
GobblinServiceManager(
           cmd.getOptionValue(ServiceConfigKeys.SERVICE_NAME_OPTION_NAME), 
getServiceId(),
           config, Optional.<Path>absent())) {
-
+        
gobblinServiceManager.getOrchestrator().setFlowStatusGenerator(gobblinServiceManager.buildFlowStatusGenerator(config));
         gobblinServiceManager.start();
 
         if (isTestMode) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 27fb888..76f5b7f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -37,6 +37,7 @@ import com.typesafe.config.Config;
 
 import javax.annotation.Nonnull;
 import lombok.Getter;
+import lombok.Setter;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -60,8 +61,12 @@ import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
@@ -70,6 +75,8 @@ import org.apache.gobblin.util.ConfigUtils;
  */
 @Alpha
 public class Orchestrator implements SpecCatalogListener, Instrumentable {
+  private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = 
"jobStatusRetriever.class";
+
   protected final Logger _log;
   protected final SpecCompiler specCompiler;
   protected final Optional<TopologyCatalog> topologyCatalog;
@@ -83,6 +90,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   private Optional<Meter> flowOrchestrationFailedMeter;
   @Getter
   private Optional<Timer> flowOrchestrationTimer;
+  @Setter
+  private FlowStatusGenerator flowStatusGenerator;
 
   private final ClassAliasResolver<SpecCompiler> aliasResolver;
 
@@ -211,6 +220,19 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
           ? 
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
           : null;
 
+      //If the FlowSpec disallows concurrent executions, then check if another 
instance of the flow is already
+      //running. If so, return immediately.
+      Config flowConfig = ((FlowSpec) spec).getConfig();
+      String flowGroup = 
flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      boolean allowConcurrentExecution = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, true);
+
+      if (!canRun(flowName, flowGroup, allowConcurrentExecution)) {
+        _log.warn("Another instance of flowGroup: {}, flowName: {} running; 
Skipping flow execution since "
+            + "concurrent executions are disabled for this flow.", flowGroup, 
flowName);
+        return;
+      }
+
       Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
 
       Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
@@ -283,6 +305,21 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  /**
+   * Check if the flow instance is allowed to run.
+   * @param flowName
+   * @param flowGroup
+   * @param allowConcurrentExecution
+   * @return true if the {@link FlowSpec} allows concurrent executions or if 
no other instance of the flow is currently RUNNING.
+   */
+  private boolean canRun(String flowName, String flowGroup, boolean 
allowConcurrentExecution) {
+    if (allowConcurrentExecution) {
+      return true;
+    } else {
+      return !flowStatusGenerator.isFlowRunning(flowName, flowGroup);
+    }
+  }
+
   public void remove(Spec spec, Properties headers) {
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
@@ -316,6 +353,19 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
+  private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
+    JobStatusRetriever jobStatusRetriever;
+    try {
+      Class jobStatusRetrieverClass = 
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, 
FsJobStatusRetriever.class.getName()));
+      jobStatusRetriever =
+          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, 
config);
+    } catch (ReflectiveOperationException e) {
+      _log.error("Exception encountered when instantiating 
JobStatusRetriever");
+      throw new RuntimeException(e);
+    }
+    return 
FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
+  }
+
   @Nonnull
   @Override
   public MetricContext getMetricContext() {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index 2b4332f..11b2c62 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -52,6 +52,7 @@ import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 
 @Test
@@ -69,11 +70,13 @@ public class GobblinServiceHATest {
   private static final String NODE_1_SPEC_STORE_PARENT_DIR = 
"/tmp/serviceCoreNode1/";
   private static final String NODE_1_TOPOLOGY_SPEC_STORE_DIR = 
"/tmp/serviceCoreNode1/topologyTestSpecStoreNode1";
   private static final String NODE_1_FLOW_SPEC_STORE_DIR = 
"/tmp/serviceCoreCommon/flowTestSpecStore";
+  private static final String NODE_1_JOB_STATUS_STATE_STORE_DIR = 
"/tmp/serviceCoreNode1/fsJobStatusRetriever";
 
   private static final String NODE_2_SERVICE_WORK_DIR = 
"/tmp/serviceWorkDirNode2/";
   private static final String NODE_2_SPEC_STORE_PARENT_DIR = 
"/tmp/serviceCoreNode2/";
   private static final String NODE_2_TOPOLOGY_SPEC_STORE_DIR = 
"/tmp/serviceCoreNode2/topologyTestSpecStoreNode2";
   private static final String NODE_2_FLOW_SPEC_STORE_DIR = 
"/tmp/serviceCoreCommon/flowTestSpecStore";
+  private static final String NODE_2_JOB_STATUS_STATE_STORE_DIR = 
"/tmp/serviceCoreNode2/fsJobStatusRetriever";
 
   private static final String TEST_HELIX_CLUSTER_NAME = 
"testGobblinServiceCluster";
 
@@ -150,6 +153,7 @@ public class GobblinServiceHATest {
     node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
     
node1ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, 
NODE_1_TOPOLOGY_SPEC_STORE_DIR);
     node1ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, 
NODE_1_FLOW_SPEC_STORE_DIR);
+    node1ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_1_JOB_STATUS_STATE_STORE_DIR);
     node1ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler1");
     node1ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
 
@@ -157,6 +161,7 @@ public class GobblinServiceHATest {
     node2ServiceCoreProperties.putAll(commonServiceCoreProperties);
     
node2ServiceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, 
NODE_2_TOPOLOGY_SPEC_STORE_DIR);
     node2ServiceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, 
NODE_2_FLOW_SPEC_STORE_DIR);
+    node2ServiceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, NODE_2_JOB_STATUS_STATE_STORE_DIR);
     node2ServiceCoreProperties.put(QUARTZ_INSTANCE_NAME, "QuartzScheduler2");
     node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
index 68b030b..32dc529 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java
@@ -55,6 +55,7 @@ import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -71,6 +72,7 @@ public class GobblinServiceManagerTest {
   private static final String GIT_CLONE_DIR = "/tmp/serviceCore/clone";
   private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote";
   private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local";
+  private static final String JOB_STATUS_STATE_STORE_DIR = 
"/tmp/serviceCore/fsJobStatusRetriever";
 
   private static final String TEST_GROUP_NAME = "testGroup";
   private static final String TEST_FLOW_NAME = "testFlow";
@@ -112,6 +114,8 @@ public class GobblinServiceManagerTest {
     serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." 
+ ConfigurationKeys.GIT_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR);
     serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." 
+ ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5);
 
+    serviceCoreProperties.put(FsJobStatusRetriever.CONF_PREFIX + "." + 
ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, JOB_STATUS_STATE_STORE_DIR);
+
     // Create a bare repository
     RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new 
File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
     fileKey.open(false).create(true);

Reply via email to