This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d29bd9f32 [GOBBLIN-1752] Fix race condition where FSTemplateCatalog 
would update at the same t… (#3612)
d29bd9f32 is described below

commit d29bd9f32158bbc2cc9c9bbbdb90d565fb34b078
Author: William Lo <[email protected]>
AuthorDate: Thu Dec 8 12:46:14 2022 -0800

    [GOBBLIN-1752] Fix race condition where FSTemplateCatalog would update at 
the same t… (#3612)
    
    * Fix race condition where FSTemplateCatalog would update at the same time 
as the FSFlowGraph if their files are updated at the same time as the flowgraph 
monitor
    
    * Add javadoc
    
    * Fix checkstyle
    
    * Address review, simplify tests
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  1 +
 .../service/modules/flow/MultiHopFlowCompiler.java | 16 ++--
 .../FSPathAlterationFlowGraphListener.java         | 14 +++-
 .../ObservingFSFlowEdgeTemplateCatalog.java        | 64 +---------------
 ...og.java => UpdatableFSFlowTemplateCatalog.java} | 68 +++--------------
 .../service/monitoring/FsFlowGraphMonitor.java     | 21 ++++--
 .../UpdatableFSFFlowTemplateCatalogTest.java       | 85 ++++++++++++++++++++++
 .../service/monitoring/FsFlowGraphMonitorTest.java | 40 ++++++++--
 8 files changed, 170 insertions(+), 139 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 1aa6e30dc..f80177a8d 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -103,6 +103,7 @@ public class ServiceConfigKeys {
 
   // Template Catalog Keys
   public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = 
GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
+  public static final String TEMPLATE_CATALOGS_CLASS_KEY = 
GOBBLIN_SERVICE_PREFIX + "templateCatalogs.class";
 
   // Keys related to user-specified policy on route selection.
   // Undesired connection to form an executable JobSpec.
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 1ffef3969..20d2339c7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.service.modules.flow;
 
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -61,6 +60,7 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import 
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
 import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -130,12 +130,16 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     // Use atomic reference to avoid partial flowgraph upgrades during path 
compilation.
     this.flowGraph = new AtomicReference<>(new 
BaseFlowGraph(dataNodeAliasMap));
 
-    Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog = 
Optional.absent();
+    Optional<? extends UpdatableFSFlowTemplateCatalog> flowTemplateCatalog;
     if 
(config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
         && 
StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)))
 {
+
       try {
-        flowTemplateCatalog = Optional.of(new 
ObservingFSFlowEdgeTemplateCatalog(config, rwLock));
-      } catch (IOException e) {
+        String flowTemplateCatalogClassName = 
ConfigUtils.getString(this.config, 
ServiceConfigKeys.TEMPLATE_CATALOGS_CLASS_KEY, 
ObservingFSFlowEdgeTemplateCatalog.class.getCanonicalName());
+        flowTemplateCatalog = Optional.of(
+            (UpdatableFSFlowTemplateCatalog) 
ConstructorUtils.invokeConstructor(Class.forName(new 
ClassAliasResolver<>(UpdatableFSFlowTemplateCatalog.class)
+                .resolve(flowTemplateCatalogClassName)), config, rwLock));
+      } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
         throw new RuntimeException("Cannot instantiate " + 
getClass().getName(), e);
       }
     } else {
@@ -163,7 +167,9 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
-    this.serviceManager = new 
ServiceManager(Lists.newArrayList(this.flowGraphMonitor, 
flowTemplateCatalog.get()));
+    this.serviceManager = (flowTemplateCatalog.isPresent() && 
flowTemplateCatalog.get() instanceof ObservingFSFlowEdgeTemplateCatalog) ?
+       new ServiceManager(Lists.newArrayList(this.flowGraphMonitor, 
flowTemplateCatalog.get())) : new 
ServiceManager(Lists.newArrayList(this.flowGraphMonitor));
+
     addShutdownHook();
     //Start the git flow graph monitor
     try {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
index 1700d8805..1b770194a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
@@ -26,7 +26,7 @@ import com.google.common.base.Optional;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
-import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import 
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
 import org.apache.gobblin.util.filesystem.PathAlterationListener;
 import org.apache.gobblin.util.filesystem.PathAlterationObserver;
 
@@ -41,10 +41,14 @@ import 
org.apache.gobblin.util.filesystem.PathAlterationObserver;
 public class FSPathAlterationFlowGraphListener implements 
PathAlterationListener {
   private final MultiHopFlowCompiler compiler;
   private final BaseFlowGraphHelper flowGraphHelper;
+  private Optional<UpdatableFSFlowTemplateCatalog> flowTemplateCatalog;
+  private final boolean shouldMonitorTemplateCatalog;
 
-  public FSPathAlterationFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
-      MultiHopFlowCompiler compiler, String baseDirectory, BaseFlowGraphHelper 
flowGraphHelper) {
+  public 
FSPathAlterationFlowGraphListener(Optional<UpdatableFSFlowTemplateCatalog> 
flowTemplateCatalog,
+      MultiHopFlowCompiler compiler, String baseDirectory, BaseFlowGraphHelper 
flowGraphHelper, boolean shouldMonitorTemplateCatalog) {
     this.flowGraphHelper = flowGraphHelper;
+    this.flowTemplateCatalog = flowTemplateCatalog;
+    this.shouldMonitorTemplateCatalog = shouldMonitorTemplateCatalog;
     File graphDir = new File(baseDirectory);
     // Populate the flowgraph with any existing files
     if (!graphDir.exists()) {
@@ -88,6 +92,10 @@ public class FSPathAlterationFlowGraphListener implements 
PathAlterationListener
   @Override
   public void onCheckDetectedChange() {
     log.info("Detecting change in flowgraph files, reloading flowgraph");
+    if (this.shouldMonitorTemplateCatalog) {
+      // Clear template cache as templates are colocated with the flowgraph, 
and thus could have been updated too
+      this.flowTemplateCatalog.get().clearTemplates();
+    }
     FlowGraph newGraph = this.flowGraphHelper.generateFlowGraph();
     if (newGraph != null) {
       this.compiler.setFlowGraph(newGraph);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
index 5c47388bf..baf413a91 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
@@ -19,22 +19,13 @@ package org.apache.gobblin.service.modules.template_catalog;
 
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 
-import org.apache.hadoop.fs.Path;
-
 import com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.modules.template.FlowTemplate;
 import org.apache.gobblin.util.filesystem.PathAlterationListener;
 import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
 
@@ -45,42 +36,12 @@ import 
org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
  * is detected so that the next call to {@link #getFlowTemplate(URI)} will use 
the updated files.
  */
 @Slf4j
-public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
-  private Map<URI, FlowTemplate> flowTemplateMap = new ConcurrentHashMap<>();
-  private Map<URI, List<JobTemplate>> jobTemplateMap = new 
ConcurrentHashMap<>();
-  private ReadWriteLock rwLock;
+public class ObservingFSFlowEdgeTemplateCatalog extends 
UpdatableFSFlowTemplateCatalog {
 
   private AtomicBoolean shouldRefreshFlowGraph = new AtomicBoolean(false);
 
   public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock 
rwLock) throws IOException {
-    super(sysConfig);
-    this.rwLock = rwLock;
-  }
-
-  @Override
-  public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
-      throws SpecNotFoundException, JobTemplate.TemplateException, 
IOException, URISyntaxException {
-    FlowTemplate flowTemplate = 
flowTemplateMap.getOrDefault(flowTemplateDirURI, null);
-
-    if (flowTemplate == null) {
-      flowTemplate = super.getFlowTemplate(flowTemplateDirURI);
-      flowTemplateMap.put(flowTemplateDirURI, flowTemplate);
-    }
-
-    return flowTemplate;
-  }
-
-  @Override
-  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
-      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
-    List<JobTemplate> jobTemplates = 
jobTemplateMap.getOrDefault(flowTemplateDirURI, null);
-
-    if (jobTemplates == null) {
-      jobTemplates = super.getJobTemplatesForFlow(flowTemplateDirURI);
-      jobTemplateMap.put(flowTemplateDirURI, jobTemplates);
-    }
-
-    return jobTemplates;
+    super(sysConfig, rwLock);
   }
 
   @Override
@@ -100,32 +61,15 @@ public class ObservingFSFlowEdgeTemplateCatalog extends 
FSFlowTemplateCatalog {
     return this.shouldRefreshFlowGraph.getAndSet(value);
   }
 
-  /**
-   * Clear cached templates so they will be reloaded next time {@link 
#getFlowTemplate(URI)} is called.
-   * Also refresh git flow graph in case any edges that failed to be added on 
startup are successful now.
-   */
-  private void clearTemplates() {
-    this.rwLock.writeLock().lock();
-    log.info("Change detected, reloading flow templates.");
-    flowTemplateMap.clear();
-    jobTemplateMap.clear();
-    getAndSetShouldRefreshFlowGraph(true);
-    this.rwLock.writeLock().unlock();
-  }
-
   /**
    * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} that 
clears flow/job template cache if a file is
    * created or updated.
    */
   private class FlowCatalogPathAlterationListener extends 
PathAlterationListenerAdaptor {
     @Override
-    public void onFileCreate(Path path) {
-      clearTemplates();
-    }
-
-    @Override
-    public void onFileChange(Path path) {
+    public void onCheckDetectedChange() {
       clearTemplates();
+      getAndSetShouldRefreshFlowGraph(true);
     }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFlowTemplateCatalog.java
similarity index 56%
copy from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
copy to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFlowTemplateCatalog.java
index 5c47388bf..2915c5530 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFlowTemplateCatalog.java
@@ -23,36 +23,26 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 
-import org.apache.hadoop.fs.Path;
 
 import com.typesafe.config.Config;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.template.FlowTemplate;
-import org.apache.gobblin.util.filesystem.PathAlterationListener;
-import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
-
 
 /**
- * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates. 
It has a
- * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} on the 
root path, and clears the cache when a change
- * is detected so that the next call to {@link #getFlowTemplate(URI)} will use 
the updated files.
+ * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates. 
It provides a public method clearTemplates()
+ * for other classes to invoke, so that other classes can reload the job 
templates before they make a change. E.g. The
+ * {@link org.apache.gobblin.service.monitoring.FsFlowGraphMonitor} has a 
configuration to clear the template cache before updating the flowgraph.
  */
-@Slf4j
-public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
-  private Map<URI, FlowTemplate> flowTemplateMap = new ConcurrentHashMap<>();
-  private Map<URI, List<JobTemplate>> jobTemplateMap = new 
ConcurrentHashMap<>();
-  private ReadWriteLock rwLock;
-
-  private AtomicBoolean shouldRefreshFlowGraph = new AtomicBoolean(false);
+public class UpdatableFSFlowTemplateCatalog extends FSFlowTemplateCatalog {
+  private final Map<URI, FlowTemplate> flowTemplateMap = new 
ConcurrentHashMap<>();
+  private final Map<URI, List<JobTemplate>> jobTemplateMap = new 
ConcurrentHashMap<>();
+  private final ReadWriteLock rwLock;
 
-  public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock 
rwLock) throws IOException {
+  public UpdatableFSFlowTemplateCatalog(Config sysConfig, ReadWriteLock 
rwLock) throws IOException {
     super(sysConfig);
     this.rwLock = rwLock;
   }
@@ -77,55 +67,21 @@ public class ObservingFSFlowEdgeTemplateCatalog extends 
FSFlowTemplateCatalog {
 
     if (jobTemplates == null) {
       jobTemplates = super.getJobTemplatesForFlow(flowTemplateDirURI);
+      log.info("Loading flow template directly from {} and caching it.", 
flowTemplateDirURI);
       jobTemplateMap.put(flowTemplateDirURI, jobTemplates);
     }
 
     return jobTemplates;
   }
 
-  @Override
-  protected PathAlterationListener getListener() {
-    return new FlowCatalogPathAlterationListener();
-  }
-
-  @Override
-  protected void startUp() throws IOException {
-    if (this.pathAlterationDetector != null) {
-      this.pathAlterationDetector.start();
-    }
-  }
-
-  @Override
-  public boolean getAndSetShouldRefreshFlowGraph(boolean value) {
-    return this.shouldRefreshFlowGraph.getAndSet(value);
-  }
-
   /**
    * Clear cached templates so they will be reloaded next time {@link 
#getFlowTemplate(URI)} is called.
-   * Also refresh git flow graph in case any edges that failed to be added on 
startup are successful now.
    */
-  private void clearTemplates() {
+  public void clearTemplates() {
     this.rwLock.writeLock().lock();
-    log.info("Change detected, reloading flow templates.");
+    log.info("Change detected, clearing flow template cache.");
     flowTemplateMap.clear();
     jobTemplateMap.clear();
-    getAndSetShouldRefreshFlowGraph(true);
     this.rwLock.writeLock().unlock();
   }
-
-  /**
-   * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} that 
clears flow/job template cache if a file is
-   * created or updated.
-   */
-  private class FlowCatalogPathAlterationListener extends 
PathAlterationListenerAdaptor {
-    @Override
-    public void onFileCreate(Path path) {
-      clearTemplates();
-    }
-
-    @Override
-    public void onFileChange(Path path) {
-      clearTemplates();
-    }
-  }
-}
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
index bda777ae3..f23bf7b89 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
@@ -42,7 +42,7 @@ import 
org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
 import 
org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
-import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import 
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.filesystem.PathAlterationObserver;
@@ -52,6 +52,8 @@ import 
org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
 @Slf4j
 public class FsFlowGraphMonitor extends AbstractIdleService implements 
FlowGraphMonitor {
   public static final String FS_FLOWGRAPH_MONITOR_PREFIX = 
"gobblin.service.fsFlowGraphMonitor";
+  public static final String MONITOR_TEMPLATE_CATALOG_CHANGES =  
"monitorTemplateChanges";
+
   private static final long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
   private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR = 
"/tmp/fsFlowgraph";
   private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = 
"gobblin-flowgraph";
@@ -61,7 +63,8 @@ public class FsFlowGraphMonitor extends AbstractIdleService 
implements FlowGraph
   private final PathAlterationObserverScheduler pathAlterationDetector;
   private final FSPathAlterationFlowGraphListener listener;
   private final PathAlterationObserver observer;
-  private final Path flowGraphPath;
+  private Path flowGraphPath;
+  private Path observedPath;
   private final MultiHopFlowCompiler compiler;
   private final CountDownLatch initComplete;
   private static final Config DEFAULT_FALLBACK = 
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -69,16 +72,20 @@ public class FsFlowGraphMonitor extends AbstractIdleService 
implements FlowGraph
       .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR, 
DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
       .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 
DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
       .put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS, 
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+      .put(MONITOR_TEMPLATE_CATALOG_CHANGES, false)
       .put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS, 
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS).build());
 
-  public FsFlowGraphMonitor(Config config, Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+  public FsFlowGraphMonitor(Config config, 
Optional<UpdatableFSFlowTemplateCatalog> flowTemplateCatalog,
       MultiHopFlowCompiler compiler, Map<URI, TopologySpec> topologySpecMap, 
CountDownLatch initComplete, boolean instrumentationEnabled)
       throws IOException {
     Config configWithFallbacks = 
config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
     this.pollingInterval =
         
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
     this.flowGraphPath = new 
Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR));
-    this.observer = new PathAlterationObserver(flowGraphPath);
+    // If the FSFlowGraphMonitor is also monitoring the templates, assume that 
they are colocated under the same parent folder
+    boolean shouldMonitorTemplateCatalog = 
configWithFallbacks.getBoolean(MONITOR_TEMPLATE_CATALOG_CHANGES);
+    this.observedPath = shouldMonitorTemplateCatalog ? 
this.flowGraphPath.getParent() : this.flowGraphPath;
+    this.observer = new PathAlterationObserver(observedPath);
     try {
       String helperClassName = ConfigUtils.getString(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY,
           BaseFlowGraphHelper.class.getCanonicalName());
@@ -88,7 +95,7 @@ public class FsFlowGraphMonitor extends AbstractIdleService 
implements FlowGraph
     } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
-    this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog, 
compiler, flowGraphPath.toString(), this.flowGraphHelper);
+    this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog, 
compiler, flowGraphPath.toString(), this.flowGraphHelper, 
shouldMonitorTemplateCatalog);
     this.compiler = compiler;
     this.initComplete = initComplete;
 
@@ -98,7 +105,7 @@ public class FsFlowGraphMonitor extends AbstractIdleService 
implements FlowGraph
       this.pathAlterationDetector = new 
PathAlterationObserverScheduler(pollingInterval);
       Optional<PathAlterationObserver> observerOptional = 
Optional.fromNullable(observer);
       this.pathAlterationDetector.addPathAlterationObserver(this.listener, 
observerOptional,
-          this.flowGraphPath);
+          this.observedPath);
     }
   }
 
@@ -116,7 +123,7 @@ public class FsFlowGraphMonitor extends AbstractIdleService 
implements FlowGraph
     } else if (isActive) {
       if (this.pathAlterationDetector != null) {
         log.info("Starting the " + getClass().getSimpleName());
-        log.info("Polling flowgraph folder with interval {} ", 
this.pollingInterval);
+        log.info("Polling folder {} with interval {} ", this.observedPath, 
this.pollingInterval);
         try {
           this.pathAlterationDetector.start();
           // Manually instantiate flowgraph when the monitor becomes active
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFFlowTemplateCatalogTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFFlowTemplateCatalogTest.java
new file mode 100644
index 000000000..4a98db887
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFFlowTemplateCatalogTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+@Slf4j
+public class UpdatableFSFFlowTemplateCatalogTest {
+
+  private File templateDir;
+  private Config templateCatalogCfg;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    this.templateDir = Files.createTempDir();
+    FileUtils.forceDeleteOnExit(templateDir);
+    FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()), 
templateDir);
+    Properties properties = new Properties();
+    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
templateDir.toURI().toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    this.templateCatalogCfg = 
config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+  }
+
+  @Test
+  public void testModifyFlowTemplate() throws Exception {
+    UpdatableFSFlowTemplateCatalog catalog = new 
UpdatableFSFlowTemplateCatalog(this.templateCatalogCfg, new 
ReentrantReadWriteLock());
+
+    // Check cached flow template is returned
+    FlowTemplate flowTemplate1 = catalog.getFlowTemplate(new 
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+    FlowTemplate flowTemplate2 = catalog.getFlowTemplate(new 
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+    Assert.assertSame(flowTemplate1, flowTemplate2);
+
+    // Update a file flow catalog and check that the getFlowTemplate returns 
the new value
+    Path flowConfPath = new File(new File(this.templateDir, 
FSFlowTemplateCatalogTest.TEST_TEMPLATE_NAME), "flow.conf").toPath();
+    List<String> lines = java.nio.file.Files.readAllLines(flowConfPath);
+    for (int i = 0; i < lines.size(); i++) {
+      if 
(lines.get(i).equals("gobblin.flow.edge.input.dataset.descriptor.0.format=avro"))
 {
+        lines.set(i, 
"gobblin.flow.edge.input.dataset.descriptor.0.format=any");
+        break;
+      }
+    }
+    java.nio.file.Files.write(flowConfPath, lines);
+    catalog.clearTemplates();
+    Assert.assertEquals(catalog.getFlowTemplate(new 
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI)).
+            
getRawTemplateConfig().getString("gobblin.flow.edge.input.dataset.descriptor.0.format"),
 "any");
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 7e1a17414..1b3397756 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -34,6 +34,7 @@ import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.gobblin.config.ConfigBuilder;
@@ -47,7 +48,7 @@ import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import 
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jgit.transport.RefSpec;
 import org.slf4j.Logger;
@@ -60,8 +61,9 @@ import org.testng.annotations.Test;
 
 public class FsFlowGraphMonitorTest {
   private static final Logger logger = 
LoggerFactory.getLogger(FsFlowGraphMonitorTest.class);
-  private final File TEST_DIR = new File(FileUtils.getTempDirectory(), 
"fsFlowGraphTestDir");
-  private final File flowGraphDir = new File(TEST_DIR, "/gobblin-flowgraph");
+  private final File TEST_DIR = new File(FileUtils.getTempDirectory(), 
"flowGraphTemplates");
+  private final File flowGraphTestDir = new File(TEST_DIR, 
"fsFlowGraphTestDir");
+  private final File flowGraphDir = new File(flowGraphTestDir, 
"gobblin-flowgraph");
   private static final String NODE_1_FILE = "node1.properties";
   private final File node1Dir = new File(FileUtils.getTempDirectory(), 
"node1");
   private final File node1File = new File(node1Dir, NODE_1_FILE);
@@ -70,14 +72,15 @@ public class FsFlowGraphMonitorTest {
   private final File node2File = new File(node2Dir, NODE_2_FILE);
   private final File edge1Dir = new File(node1Dir, "node2");
   private final File edge1File = new File(edge1Dir, "edge1.properties");
-  private final File sharedNodeFolder = new File(TEST_DIR, "nodes");
+  private final File sharedNodeFolder = new File(flowGraphTestDir, "nodes");
 
   private RefSpec masterRefSpec = new RefSpec("master");
-  private Optional<FSFlowTemplateCatalog> flowCatalog;
+  private Optional<UpdatableFSFlowTemplateCatalog> flowCatalog;
   private Config config;
   private AtomicReference<FlowGraph> flowGraph;
   private FsFlowGraphMonitor flowGraphMonitor;
   private Map<URI, TopologySpec> topologySpecMap;
+  private File flowTemplateCatalogFolder;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -89,21 +92,25 @@ public class FsFlowGraphMonitorTest {
 
     this.config = ConfigBuilder.create()
         .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
-            + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, 
TEST_DIR.getAbsolutePath())
+            + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR, 
flowGraphTestDir.getAbsolutePath())
         .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
         .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 1)
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + 
FsFlowGraphMonitor.MONITOR_TEMPLATE_CATALOG_CHANGES, true)
         .build();
 
     // Create a FSFlowTemplateCatalog instance
     URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    this.flowTemplateCatalogFolder = new File(TEST_DIR, "template_catalog");
+    this.flowTemplateCatalogFolder.mkdirs();
+    FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()), 
this.flowTemplateCatalogFolder);
     Properties properties = new Properties();
     this.flowGraphDir.mkdirs();
-    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
flowTemplateCatalogUri.toString());
+    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
this.flowTemplateCatalogFolder.getAbsolutePath());
     Config config = ConfigFactory.parseProperties(properties);
     Config templateCatalogCfg = config
         .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
             
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
-    this.flowCatalog = Optional.of(new 
FSFlowTemplateCatalog(templateCatalogCfg));
+    this.flowCatalog = Optional.of(new 
UpdatableFSFlowTemplateCatalog(templateCatalogCfg,  new 
ReentrantReadWriteLock(true)));
 
     //Create a FlowGraph instance with defaults
     this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
@@ -227,6 +234,23 @@ public class FsFlowGraphMonitorTest {
   }
 
   @Test (dependsOnMethods = "testSharedFlowgraphHelper")
+  public void testUpdateOnlyTemplates() throws Exception {
+    Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
+
+    //If deleting all the templates, the cache of flow templates will be 
cleared and the flowgraph will be unable to add edges on reload.
+    cleanUpDir(this.flowTemplateCatalogFolder.getAbsolutePath());
+    Thread.sleep(3000);
+    Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 0);
+
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    // Adding the flowtemplates back will make the edges eligible to be added 
again on reload.
+    FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()), 
this.flowTemplateCatalogFolder);
+
+    Thread.sleep(3000);
+    Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
+  }
+
+  @Test (dependsOnMethods = "testUpdateOnlyTemplates")
   public void testRemoveEdge() throws Exception {
     //Node1 has 1 edge before delete
     Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");

Reply via email to