This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e238b7  [GOBBLIN-756] Add flow catalog that updates when filesystem 
is modified
4e238b7 is described below

commit 4e238b7b823304cc3640db872498e3ab12d45aae
Author: Jack Moseley <[email protected]>
AuthorDate: Wed May 1 11:49:26 2019 -0700

    [GOBBLIN-756] Add flow catalog that updates when filesystem is modified
    
    Closes #2620 from jack-moseley/dynamic-flow-
    template
---
 .../runtime/job_catalog/ImmutableFSJobCatalog.java |  11 +-
 .../service/modules/core/GitFlowGraphMonitor.java  |  13 ++-
 .../service/modules/flow/MultiHopFlowCompiler.java |  18 ++-
 .../service/modules/flowgraph/BaseFlowEdge.java    |  38 ++++++-
 .../service/modules/flowgraph/BaseFlowGraph.java   |   4 +-
 .../ObservingFSFlowEdgeTemplateCatalog.java        | 121 +++++++++++++++++++++
 .../FSFlowTemplateCatalogTest.java                 |   4 +-
 .../ObservingFSFlowEdgeTemplateCatalogTest.java    | 115 ++++++++++++++++++++
 8 files changed, 300 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index fb08bc0..86e2b96 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -52,6 +52,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.PullFileLoader;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
 import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
 import org.apache.gobblin.util.filesystem.PathAlterationObserver;
 
@@ -127,13 +128,15 @@ public class ImmutableFSJobCatalog extends JobCatalogBase 
implements JobCatalog
 
       // If absent, the Optional object will be created automatically by 
addPathAlterationObserver
       Optional<PathAlterationObserver> observerOptional = 
Optional.fromNullable(observer);
-      FSPathAlterationListenerAdaptor configFilelistener =
-          new FSPathAlterationListenerAdaptor(this.jobConfDirPath, 
this.loader, this.sysConfig, this.listeners,
-              this.converter);
-      
this.pathAlterationDetector.addPathAlterationObserver(configFilelistener, 
observerOptional, this.jobConfDirPath);
+      this.pathAlterationDetector.addPathAlterationObserver(getListener(), 
observerOptional, this.jobConfDirPath);
     }
   }
 
+  protected PathAlterationListener getListener() {
+    return new FSPathAlterationListenerAdaptor(this.jobConfDirPath, 
this.loader, this.sysConfig, this.listeners,
+        this.converter);
+  }
+
   @Override
   protected void startUp()
       throws IOException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 024c9a1..5a69371 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -88,15 +88,16 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
           .put(SHOULD_CHECKPOINT_HASHES, false)
           .build());
 
-  private Optional<FSFlowTemplateCatalog> flowCatalog;
+  private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
   private FlowGraph flowGraph;
   private final Map<URI, TopologySpec> topologySpecMap;
   private final Config emptyConfig = ConfigFactory.empty();
   private final CountDownLatch initComplete;
 
-  public GitFlowGraphMonitor(Config config, Optional<FSFlowTemplateCatalog> 
flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, 
CountDownLatch initComplete) {
+  public GitFlowGraphMonitor(Config config, Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch 
initComplete) {
     
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
-    this.flowCatalog = flowCatalog;
+    this.flowTemplateCatalog = flowTemplateCatalog;
     this.flowGraph = graph;
     this.topologySpecMap = topologySpecMap;
     this.initComplete = initComplete;
@@ -224,15 +225,15 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
         Class flowEdgeFactoryClass = 
Class.forName(ConfigUtils.getString(edgeConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
             FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
         FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) 
GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, 
edgeConfig);
-        if (flowCatalog.isPresent()) {
-          FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, 
flowCatalog.get(), specExecutors);
+        if (flowTemplateCatalog.isPresent()) {
+          FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, 
flowTemplateCatalog.get(), specExecutors);
           if (!this.flowGraph.addFlowEdge(edge)) {
             log.warn("Could not add edge {} to FlowGraph; skipping", 
edge.getId());
           } else {
             log.info("Added edge {} to FlowGraph", edge.getId());
           }
         } else {
-          log.warn("Could not add edge defined in {} to FlowGraph as 
FlowCatalog is absent", change.getNewPath());
+          log.warn("Could not add edge defined in {} to FlowGraph as 
FlowTemplateCatalog is absent", change.getNewPath());
         }
       } catch (Exception e) {
         log.warn("Could not add edge defined in {} due to exception {}", 
change.getNewPath(), e.getMessage());
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 347235a..f067344 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -23,9 +23,10 @@ import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang3.StringUtils;
-import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +55,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import 
org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -73,6 +75,8 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
 
   private GitFlowGraphMonitor gitFlowGraphMonitor;
 
+  private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
   public MultiHopFlowCompiler(Config config) {
     this(config, true);
   }
@@ -88,11 +92,11 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
   public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean 
instrumentationEnabled) {
     super(config, log, instrumentationEnabled);
     this.flowGraph = new BaseFlowGraph();
-    Optional<FSFlowTemplateCatalog> flowCatalog = Optional.absent();
+    Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog = 
Optional.absent();
     if 
(config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
         && 
StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)))
 {
       try {
-        flowCatalog = Optional.of(new FSFlowTemplateCatalog(config));
+        flowTemplateCatalog = Optional.of(new 
ObservingFSFlowEdgeTemplateCatalog(config, rwLock));
       } catch (IOException e) {
         throw new RuntimeException("Cannot instantiate " + 
getClass().getName(), e);
       }
@@ -105,8 +109,8 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
       gitFlowGraphConfig = this.config
           .withValue(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.ENCRYPT_KEY_LOC, 
config.getValue(ConfigurationKeys.ENCRYPT_KEY_LOC));
     }
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig, 
flowCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
-    this.serviceManager = new 
ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor));
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig, 
flowTemplateCatalog, this.flowGraph, this.topologySpecMap, 
this.getInitComplete());
+    this.serviceManager = new 
ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor, 
flowTemplateCatalog.get()));
     addShutdownHook();
     //Start the git flow graph monitor
     try {
@@ -167,6 +171,7 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
 
     Dag<JobExecutionPlan> jobExecutionPlanDag;
     try {
+      this.rwLock.readLock().lock();
       //Compute the path from source to destination.
       FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec);
       //Convert the path into a Dag of JobExecutionPlans.
@@ -183,9 +188,12 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
               .format("Exception encountered while compiling flow for source: 
%s and destination: %s", source, destination),
           e);
       return null;
+    } finally {
+      this.rwLock.readLock().unlock();
     }
     Instrumented.markMeter(flowCompilationSuccessFulMeter);
     Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
+
     return jobExecutionPlanDag;
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 0c1023e..3d65f6e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,10 +17,11 @@
 
 package org.apache.gobblin.service.modules.flowgraph;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 
-import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.base.Preconditions;
@@ -29,17 +30,23 @@ import com.typesafe.config.Config;
 
 import joptsimple.internal.Strings;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
- * An implementation of {@link FlowEdge}.
+ * An implementation of {@link FlowEdge}. If a {@link FSFlowTemplateCatalog} 
is specified in the constructor,
+ * {@link #flowTemplate} is reloaded when {@link #getFlowTemplate()} is called.
  */
 @Alpha
+@Slf4j
 public class BaseFlowEdge implements FlowEdge {
   @Getter
   protected String src;
@@ -47,7 +54,6 @@ public class BaseFlowEdge implements FlowEdge {
   @Getter
   protected String dest;
 
-  @Getter
   protected FlowTemplate flowTemplate;
 
   @Getter
@@ -62,8 +68,16 @@ public class BaseFlowEdge implements FlowEdge {
   @Getter
   private boolean active;
 
+  private final FSFlowTemplateCatalog flowTemplateCatalog;
+
   //Constructor
-  public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate 
flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
+  public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate 
flowTemplate, List<SpecExecutor> executors,
+      Config properties, boolean active) {
+    this(endPoints, edgeId, flowTemplate, executors, properties, active, null);
+  }
+
+  public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate 
flowTemplate, List<SpecExecutor> executors,
+      Config properties, boolean active, FSFlowTemplateCatalog 
flowTemplateCatalog) {
     this.src = endPoints.get(0);
     this.dest = endPoints.get(1);
     this.flowTemplate = flowTemplate;
@@ -71,6 +85,20 @@ public class BaseFlowEdge implements FlowEdge {
     this.active = active;
     this.config = properties;
     this.id = edgeId;
+    this.flowTemplateCatalog = flowTemplateCatalog;
+  }
+
+  @Override
+  public FlowTemplate getFlowTemplate() {
+    try {
+      if (this.flowTemplateCatalog != null) {
+        this.flowTemplate = 
this.flowTemplateCatalog.getFlowTemplate(this.flowTemplate.getUri());
+      }
+    } catch (SpecNotFoundException | JobTemplate.TemplateException | 
IOException | URISyntaxException e) {
+      // If loading template fails, use the template that was successfully 
loaded on construction
+      log.warn("Failed to get flow template at " + this.flowTemplate.getUri() 
+ ", using in-memory flow template");
+    }
+    return this.flowTemplate;
   }
 
   @Override
@@ -148,7 +176,7 @@ public class BaseFlowEdge implements FlowEdge {
         boolean isActive = ConfigUtils.getBoolean(edgeProps, 
FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
 
         FlowTemplate flowTemplate = flowTemplateCatalog.getFlowTemplate(new 
URI(flowTemplateDirUri));
-        return new BaseFlowEdge(endPoints, edgeId, flowTemplate, 
specExecutors, edgeProps, isActive);
+        return new BaseFlowEdge(endPoints, edgeId, flowTemplate, 
specExecutors, edgeProps, isActive, flowTemplateCatalog);
       } catch (RuntimeException e) {
         throw e;
       } catch (Exception e) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index e2b256f..83a9a58 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.service.modules.flow.FlowGraphPath;
@@ -31,8 +33,6 @@ import 
org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * A thread-safe implementation of {@link FlowGraph}. The implementation 
maintains the following data structures:
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
new file mode 100644
index 0000000..9b68024
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
+
+
+/**
+ * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates. 
It has a
+ * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} on the 
root path, and clears the cache when a change
+ * is detected so that the next call to {@link #getFlowTemplate(URI)} will use 
the updated files.
+ */
+@Slf4j
+public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
+  private Map<URI, FlowTemplate> flowTemplateMap = new ConcurrentHashMap<>();
+  private Map<URI, List<JobTemplate>> jobTemplateMap = new 
ConcurrentHashMap<>();
+  private ReadWriteLock rwLock;
+
+  public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock 
rwLock) throws IOException {
+    super(sysConfig);
+    this.rwLock = rwLock;
+  }
+
+  @Override
+  public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
+      throws SpecNotFoundException, JobTemplate.TemplateException, 
IOException, URISyntaxException {
+    FlowTemplate flowTemplate = 
flowTemplateMap.getOrDefault(flowTemplateDirURI, null);
+
+    if (flowTemplate == null) {
+      flowTemplate = super.getFlowTemplate(flowTemplateDirURI);
+      flowTemplateMap.put(flowTemplateDirURI, flowTemplate);
+    }
+
+    return flowTemplate;
+  }
+
+  @Override
+  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
+    List<JobTemplate> jobTemplates = 
jobTemplateMap.getOrDefault(flowTemplateDirURI, null);
+
+    if (jobTemplates == null) {
+      jobTemplates = super.getJobTemplatesForFlow(flowTemplateDirURI);
+      jobTemplateMap.put(flowTemplateDirURI, jobTemplates);
+    }
+
+    return jobTemplates;
+  }
+
+  @Override
+  protected PathAlterationListener getListener() {
+    return new FlowCatalogPathAlterationListener();
+  }
+
+  @Override
+  protected void startUp() throws IOException {
+    if (this.pathAlterationDetector != null) {
+      this.pathAlterationDetector.start();
+    }
+  }
+
+  /**
+   * Clear cached templates so they will be reloaded next time {@link 
#getFlowTemplate(URI)} is called.
+   */
+  private void clearTemplates() {
+    this.rwLock.writeLock().lock();
+    log.info("Change detected, reloading flow templates.");
+    flowTemplateMap.clear();
+    jobTemplateMap.clear();
+    this.rwLock.writeLock().unlock();
+  }
+
+  /**
+   * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} that 
clears flow/job template cache if a file is
+   * created or updated.
+   */
+  private class FlowCatalogPathAlterationListener extends 
PathAlterationListenerAdaptor {
+    @Override
+    public void onFileCreate(Path path) {
+      clearTemplates();
+    }
+
+    @Override
+    public void onFileChange(Path path) {
+      clearTemplates();
+    }
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
index 550c97d..f8cdf79 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
@@ -44,8 +44,8 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class FSFlowTemplateCatalogTest {
-  private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
-  private static final String TEST_TEMPLATE_DIR_URI = "FS:///" + 
TEST_TEMPLATE_NAME;
+  public static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+  public static final String TEST_TEMPLATE_DIR_URI = "FS:///" + 
TEST_TEMPLATE_NAME;
 
   @Test
   public void testGetFlowTemplate() throws Exception {
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalogTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalogTest.java
new file mode 100644
index 0000000..b1de754
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalogTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ServiceManager;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+
+@Slf4j
+public class ObservingFSFlowEdgeTemplateCatalogTest {
+
+  private File templateDir;
+  private Config templateCatalogCfg;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    this.templateDir = Files.createTempDir();
+    FileUtils.forceDeleteOnExit(templateDir);
+    FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()), 
templateDir);
+    Properties properties = new Properties();
+    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
templateDir.toURI().toString());
+    
properties.put(ConfigurationKeys.JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL_KEY, 
"1000");
+    Config config = ConfigFactory.parseProperties(properties);
+    this.templateCatalogCfg = 
config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+  }
+
+  @Test
+  public void testModifyFlowTemplate() throws Exception {
+    ObservingFSFlowEdgeTemplateCatalog catalog = new 
ObservingFSFlowEdgeTemplateCatalog(this.templateCatalogCfg, new 
ReentrantReadWriteLock());
+    ServiceManager serviceManager = new 
ServiceManager(Lists.newArrayList(catalog));
+    serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
+
+    // Check cached flow template is returned
+    FlowTemplate flowTemplate1 = catalog.getFlowTemplate(new 
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+    FlowTemplate flowTemplate2 = catalog.getFlowTemplate(new 
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+    Assert.assertSame(flowTemplate1, flowTemplate2);
+
+    // Update a file flow catalog and check that the getFlowTemplate returns 
the new value
+    Path flowConfPath = new File(new File(this.templateDir, 
FSFlowTemplateCatalogTest.TEST_TEMPLATE_NAME), "flow.conf").toPath();
+    List<String> lines = java.nio.file.Files.readAllLines(flowConfPath);
+    for (int i = 0; i < lines.size(); i++) {
+      if 
(lines.get(i).equals("gobblin.flow.edge.input.dataset.descriptor.0.format=avro"))
 {
+        lines.set(i, 
"gobblin.flow.edge.input.dataset.descriptor.0.format=any");
+        break;
+      }
+    }
+    java.nio.file.Files.write(flowConfPath, lines);
+
+    Function testFunction = new GetFlowTemplateConfigFunction(new 
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI), catalog,
+        "gobblin.flow.edge.input.dataset.descriptor.0.format");
+    AssertWithBackoff.create().timeoutMs(10000).assertEquals(testFunction, 
"any", "flow template updated");
+  }
+
+  @AllArgsConstructor
+  private class GetFlowTemplateConfigFunction implements Function<Void, 
String> {
+    private URI flowTemplateCatalogUri;
+    private FSFlowTemplateCatalog flowTemplateCatalog;
+    private String configKey;
+
+    @Override
+    public String apply(Void input) {
+      try {
+        return 
this.flowTemplateCatalog.getFlowTemplate(this.flowTemplateCatalogUri).getRawTemplateConfig().getString(this.configKey);
+      } catch (SpecNotFoundException | JobTemplate.TemplateException | 
IOException | URISyntaxException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to