This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new eaa9cc7c4 [GOBBLIN-1678] Refactor git flowgraph component to be 
extensible (#3536)
eaa9cc7c4 is described below

commit eaa9cc7c411ae48dbfb29000cd38acacd4140a9c
Author: William Lo <[email protected]>
AuthorDate: Wed Aug 31 11:55:06 2022 -0700

    [GOBBLIN-1678] Refactor git flowgraph component to be extensible (#3536)
    
    * Refactor git flowgraph component to be extensible
    
    * Move files to appropriate modules
    
    * Cleanup and add javadocs
    
    * Cleanup, add missing javadocs
    
    * Address review and import order
    
    * Fix findbugs
    
    * Use java sort instead of collections
---
 .../gobblin/configuration/ConfigurationKeys.java   |   8 +
 .../apache/gobblin/service/ServiceConfigKeys.java  |   1 +
 .../modules/core/GobblinServiceGuiceModule.java    |   1 +
 .../modules/core/GobblinServiceManager.java        |   1 +
 .../service/modules/flow/MultiHopFlowCompiler.java |  19 +-
 .../BaseFlowGraphListener.java}                    | 205 ++++++---------------
 .../modules/flowgraph/FlowGraphMonitor.java        |  34 ++++
 .../GitConfigListener.java}                        |  96 ++++------
 .../service/monitoring/GitConfigMonitor.java       |  83 +++++++++
 .../service/monitoring/GitDiffListener.java        |  31 ++++
 .../service/monitoring/GitFlowGraphListener.java   |  77 ++++++++
 .../service/monitoring/GitFlowGraphMonitor.java    | 137 ++++++++++++++
 .../core => monitoring}/GitMonitoringService.java  |  73 +++-----
 .../gobblin/service/GobblinServiceManagerTest.java |   2 +-
 .../modules/flow/MultiHopFlowCompilerTest.java     |   2 +-
 .../core => monitoring}/GitConfigMonitorTest.java  |   2 +-
 .../GitFlowGraphMonitorTest.java                   |   2 +-
 .../util/filesystem/PathAlterationObserver.java    |   2 +-
 18 files changed, 520 insertions(+), 256 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index ec85883ae..464a575ba 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1015,6 +1015,14 @@ public class ConfigurationKeys {
   public static final String DATASET_COMBINE_KEY = 
"gobblin.flow.dataset.combine";
   public static final String WHITELISTED_EDGE_IDS = 
"gobblin.flow.whitelistedEdgeIds";
   public static final String GOBBLIN_OUTPUT_JOB_LEVEL_METRICS = 
"gobblin.job.outputJobLevelMetrics";
+
+  /**
+   * Configuration properties related to flowGraphs
+   */
+
+  public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
+  public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+
   /***
    * Configuration properties related to TopologySpec Store
    */
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index b4f04acd9..c1536e037 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -47,6 +47,7 @@ public class ServiceConfigKeys {
   public static final String HELIX_INSTANCE_NAME_OPTION_NAME = 
"helix_instance_name";
   public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX 
+ "helixInstanceName";
   public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX 
+ "flowSpec";
+  public static final String GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY = 
GOBBLIN_SERVICE_PREFIX + "flowGraph.class";
 
   // Helix message sub types for FlowSpec
   public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index efb03339c..68170a129 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.core;
 import java.util.Objects;
 
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.helix.HelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d35eef64b..76f568495 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index d76b9123a..3e6eb13dc 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
@@ -59,7 +60,7 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
@@ -87,7 +88,7 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
   @Getter
   private CountDownLatch initComplete = new CountDownLatch(1);
 
-  private GitFlowGraphMonitor gitFlowGraphMonitor;
+  private FlowGraphMonitor flowGraphMonitor;
 
   private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
 
@@ -148,8 +149,14 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
       throw new RuntimeException(e);
     }
 
-    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig, 
flowTemplateCatalog, this.flowGraph, this.topologySpecMap, 
this.getInitComplete());
-    this.serviceManager = new 
ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor, 
flowTemplateCatalog.get()));
+    try {
+      String flowGraphMonitorClassName = ConfigUtils.getString(this.config, 
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, 
GitFlowGraphMonitor.class.getCanonicalName());
+      this.flowGraphMonitor = (FlowGraphMonitor) 
ConstructorUtils.invokeConstructor(Class.forName(new 
ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
+        flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, 
this.flowGraph, this.topologySpecMap, this.getInitComplete());
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    this.serviceManager = new 
ServiceManager(Lists.newArrayList(this.flowGraphMonitor, 
flowTemplateCatalog.get()));
     addShutdownHook();
     //Start the git flow graph monitor
     try {
@@ -175,8 +182,8 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
   @Override
   public void setActive(boolean active) {
     super.setActive(active);
-    if (this.gitFlowGraphMonitor != null) {
-      this.gitFlowGraphMonitor.setActive(active);
+    if (this.flowGraphMonitor != null) {
+      this.flowGraphMonitor.setActive(active);
     }
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
similarity index 58%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
index 6df5b99c9..fa249c4cd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
@@ -15,172 +15,89 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.modules.flowgraph;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.Set;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.diff.DiffEntry;
+import lombok.extern.slf4j.Slf4j;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PullFileLoader;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
+
 /**
- * Service that monitors for changes to {@link 
org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
- * The git repository must have an inital commit that has no files since that 
is used as a base for getting
- * the change list.
- * The {@link DataNode}s and {@link FlowEdge}s in FlowGraph need to be 
organized with the following directory structure on git:
- * <root_flowGraph_dir>/<nodeName>/<nodeName>.properties
- * <root_flowGraph_dir>/<nodeName1>/<nodeName2>/<edgeName>.properties
+ * Provides the common set of functionalities needed by listeners of {@link 
FlowGraphMonitor} to read changes in files and
+ * apply them to a {@link FlowGraph}
+ * Assumes that the directory structure between flowgraphs configuration files 
are the same.
  */
 @Slf4j
-public class GitFlowGraphMonitor extends GitMonitoringService {
-  public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = 
"gobblin.service.gitFlowGraphMonitor";
-
-  private static final String PROPERTIES_EXTENSIONS = "properties";
-  private static final String CONF_EXTENSIONS = "conf";
+public abstract class BaseFlowGraphListener {
+  protected FlowGraph flowGraph;
+  protected static final int NODE_FILE_DEPTH = 3;
+  protected static final int EDGE_FILE_DEPTH = 4;
   private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
-  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = 
"git-flowgraph";
-  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = 
"gobblin-flowgraph";
-  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = 
"master";
-
-  private static final int NODE_FILE_DEPTH = 3;
-  private static final int EDGE_FILE_DEPTH = 4;
-  private static final int DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL = 60;
 
-  private static final Config DEFAULT_FALLBACK =
-      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
-          .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
-          .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
-          .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
-          .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
-          .put(JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
-          .put(HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
-          .put(SHOULD_CHECKPOINT_HASHES, false)
-          .build());
+  final String baseDirectory;
+  private final Config emptyConfig = ConfigFactory.empty();
 
   private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
-  private FlowGraph flowGraph;
   private final Map<URI, TopologySpec> topologySpecMap;
-  private final Config emptyConfig = ConfigFactory.empty();
-  private final CountDownLatch initComplete;
 
-  public GitFlowGraphMonitor(Config config, Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
-      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch 
initComplete) {
-    
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+  final String flowGraphFolderName;
+  final PullFileLoader pullFileLoader;
+  final Set<String> javaPropsExtensions;
+  final Set<String> hoconFileExtensions;
+
+  public BaseFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> 
flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String 
baseDirectory, String flowGraphFolderName,
+      String javaPropsExtentions, String hoconFileExtensions) {
     this.flowTemplateCatalog = flowTemplateCatalog;
     this.flowGraph = graph;
     this.topologySpecMap = topologySpecMap;
-    this.initComplete = initComplete;
-  }
-
-  /**
-   * Determine if the service should poll Git. Current behavior is both master 
and slave(s) will poll Git for
-   * changes to {@link FlowGraph}.
-   */
-  @Override
-  public boolean shouldPollGit() {
-    return this.isActive;
-  }
-
-  /**
-   * Sort the changes in a commit so that changes to node files appear before 
changes to edge files. This is done so that
-   * node related changes are applied to the FlowGraph before edge related 
changes. An example where the order matters
-   * is the case when a commit adds a new node n2 as well as adds an edge from 
an existing node n1 to n2. To ensure that the
-   * addition of edge n1->n2 is successful, node n2 must exist in the graph 
and so needs to be added first. For deletions,
-   * the order does not matter and ordering the changes in the commit will 
result in the same FlowGraph state as if the changes
-   * were unordered. In other words, deletion of a node deletes all its 
incident edges from the FlowGraph. So processing an
-   * edge deletion later results in a no-op. Note that node and edge files do 
not change depth in case of modifications.
-   *
-   * If there are multiple commits between successive polls to Git, the 
re-ordering of changes across commits should not
-   * affect the final state of the FlowGraph. This is because, the order of 
changes for a given file type (i.e. node or edge)
-   * is preserved.
-   */
-  @Override
-  void processGitConfigChanges() throws GitAPIException, IOException {
-    if (flowTemplateCatalog.isPresent() && 
flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
-      log.info("Change to template catalog detected, refreshing FlowGraph");
-      this.gitRepo.initRepository();
+    this.baseDirectory = baseDirectory;
+    this.flowGraphFolderName = flowGraphFolderName;
+    Path folderPath = new Path(baseDirectory, this.flowGraphFolderName);
+    this.javaPropsExtensions = Sets.newHashSet(javaPropsExtentions.split(","));
+    this.hoconFileExtensions = Sets.newHashSet(hoconFileExtensions.split(","));
+    try {
+      this.pullFileLoader = new PullFileLoader(folderPath,
+          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new 
Configuration()),
+          this.javaPropsExtensions, this.hoconFileExtensions);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create pull file loader", e);
     }
-
-    List<DiffEntry> changes = this.gitRepo.getChanges();
-    Collections.sort(changes, (o1, o2) -> {
-      Integer o1Depth = (o1.getNewPath() != null) ? (new 
Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
-      Integer o2Depth = (o2.getNewPath() != null) ? (new 
Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth();
-      return o1Depth.compareTo(o2Depth);
-    });
-    processGitConfigChangesHelper(changes);
-    //Decrements the latch count. The countdown latch is initialized to 1. So 
after the first time the latch is decremented,
-    // the following operation should be a no-op.
-    this.initComplete.countDown();
   }
-
-  /**
-   * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
-   * the {@link FlowGraph} for an added, updated or modified node or edge file.
-   * @param change
-   */
-  @Override
-  public void addChange(DiffEntry change) {
-    Path path = new Path(change.getNewPath());
-    if (path.depth() == NODE_FILE_DEPTH) {
-      addDataNode(change);
-    } else if (path.depth() == EDGE_FILE_DEPTH) {
-      addFlowEdge(change);
-    }
-  }
-
-  /**
-   * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge} 
from the {@link FlowGraph} for
-   * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
-   * @param change
-   */
-  @Override
-  public void removeChange(DiffEntry change) {
-    Path path = new Path(change.getOldPath());
-    if (path.depth() == NODE_FILE_DEPTH) {
-      removeDataNode(change);
-    } else if (path.depth() == EDGE_FILE_DEPTH) {
-      removeFlowEdge(change);
-    }
-  }
-
   /**
    * Add a {@link DataNode} to the {@link FlowGraph}. The method uses the 
{@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
    * to instantiate a {@link DataNode} from the node config file.
-   * @param change
+   * @param path of node to add
    */
-  private void addDataNode(DiffEntry change) {
-    if (checkFilePath(change.getNewPath(), NODE_FILE_DEPTH)) {
-      Path nodeFilePath = new Path(this.repositoryDir, change.getNewPath());
+  protected void addDataNode(String path) {
+    if (checkFilePath(path, NODE_FILE_DEPTH)) {
+      Path nodeFilePath = new Path(this.baseDirectory, path);
       try {
         Config config = loadNodeFileWithOverrides(nodeFilePath);
         Class dataNodeClass = Class.forName(ConfigUtils.getString(config, 
FlowGraphConfigurationKeys.DATA_NODE_CLASS,
@@ -192,7 +109,7 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
           log.info("Added Datanode {} to FlowGraph", dataNode.getId());
         }
       } catch (Exception e) {
-        log.warn("Could not add DataNode defined in {} due to exception {}", 
change.getNewPath(), e);
+        log.warn("Could not add DataNode defined in {} due to exception {}", 
path, e);
       }
     }
   }
@@ -200,11 +117,11 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
   /**
    * Remove a {@link DataNode} from the {@link FlowGraph}. The method extracts 
the nodeId of the
    * {@link DataNode} from the node config file and uses it to delete the 
associated {@link DataNode}.
-   * @param change
+   * @param path of node to delete
    */
-  private void removeDataNode(DiffEntry change) {
-    if (checkFilePath(change.getOldPath(), NODE_FILE_DEPTH)) {
-      Path nodeFilePath = new Path(this.repositoryDir, change.getOldPath());
+  protected void removeDataNode(String path) {
+    if (checkFilePath(path, NODE_FILE_DEPTH)) {
+      Path nodeFilePath = new Path(this.baseDirectory, path);
       Config config = getNodeConfigWithOverrides(ConfigFactory.empty(), 
nodeFilePath);
       String nodeId = 
config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY);
       if (!this.flowGraph.deleteDataNode(nodeId)) {
@@ -218,11 +135,11 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
   /**
    * Add a {@link FlowEdge} to the {@link FlowGraph}. The method uses the 
{@link FlowEdgeFactory} instance
    * provided by the {@link FlowGraph} to build a {@link FlowEdge} from the 
edge config file.
-   * @param change
+   * @param path of edge to add
    */
-  private void addFlowEdge(DiffEntry change) {
-    if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) {
-      Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath());
+  protected void addFlowEdge(String path) {
+    if (checkFilePath(path, EDGE_FILE_DEPTH)) {
+      Path edgeFilePath = new Path(this.baseDirectory, path);
       try {
         Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
         List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
@@ -237,10 +154,10 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
             log.info("Added edge {} to FlowGraph", edge.getId());
           }
         } else {
-          log.warn("Could not add edge defined in {} to FlowGraph as 
FlowTemplateCatalog is absent", change.getNewPath());
+          log.warn("Could not add edge defined in {} to FlowGraph as 
FlowTemplateCatalog is absent", path);
         }
       } catch (Exception e) {
-        log.warn("Could not add edge defined in {} due to exception {}", 
change.getNewPath(), e.getMessage());
+        log.warn("Could not add edge defined in {} due to exception {}", path, 
e.getMessage());
       }
     }
   }
@@ -249,11 +166,11 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
    * Remove a {@link FlowEdge} from the {@link FlowGraph}. The method uses 
{@link FlowEdgeFactory}
    * to construct the edgeId of the {@link FlowEdge} from the config file and 
uses it to delete the associated
    * {@link FlowEdge}.
-   * @param change
+   * @param path of edge to delete
    */
-  private void removeFlowEdge(DiffEntry change) {
-    if (checkFilePath(change.getOldPath(), EDGE_FILE_DEPTH)) {
-      Path edgeFilePath = new Path(this.repositoryDir, change.getOldPath());
+  protected void removeFlowEdge(String path) {
+    if (checkFilePath(path, EDGE_FILE_DEPTH)) {
+      Path edgeFilePath = new Path(this.baseDirectory, path);
       try {
         Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), 
edgeFilePath);
         String edgeId = 
config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
@@ -303,7 +220,7 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
     for (int i = 0; i < depth - 1; i++) {
       path = path.getParent();
     }
-    if (!path.getName().equals(folderName)) {
+    if (!path.getName().equals(flowGraphFolderName)) {
       return false;
     }
     return true;
@@ -315,7 +232,7 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
    * @param nodeFilePath path of the node file
    * @return config with overridden data.node.id
    */
-  private Config getNodeConfigWithOverrides(Config nodeConfig, Path 
nodeFilePath) {
+  protected Config getNodeConfigWithOverrides(Config nodeConfig, Path 
nodeFilePath) {
     String nodeId = nodeFilePath.getParent().getName();
     return nodeConfig.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, 
ConfigValueFactory.fromAnyRef(nodeId));
   }
@@ -342,8 +259,7 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
    * @param edgeConfig containing the logical names of SpecExecutors for this 
edge.
    * @return a {@link List<SpecExecutor>}s for this edge.
    */
-  private List<SpecExecutor> getSpecExecutors(Config edgeConfig)
-      throws URISyntaxException {
+  private List<SpecExecutor> getSpecExecutors(Config edgeConfig) throws 
URISyntaxException {
     //Get the logical names of SpecExecutors where the FlowEdge can be 
executed.
     List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
     //Load all the SpecExecutor configurations for this FlowEdge from the 
SpecExecutor Catalog.
@@ -361,18 +277,19 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
    * @return the configuration object
    * @throws IOException
    */
-  private Config loadNodeFileWithOverrides(Path filePath) throws IOException {
+  protected Config loadNodeFileWithOverrides(Path filePath) throws IOException 
{
     Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getNodeConfigWithOverrides(nodeConfig, filePath);
   }
 
+
   /**
    * Load the edge file.
    * @param filePath path of the edge file relative to the repository root
    * @return the configuration object
    * @throws IOException
    */
-  private Config loadEdgeFileWithOverrides(Path filePath) throws IOException {
+  protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException 
{
     Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getEdgeConfigWithOverrides(edgeConfig, filePath);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
new file mode 100644
index 000000000..7c39b0157
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A service that listens to an external service or filesystem (Git, FS) to 
apply changes to the flowgraph without having to restart GaaS
+ */
+public interface FlowGraphMonitor extends Service {
+
+  /**
+   * Indicates that the service is ready to load the flowgraph
+   * @param value whether GaaS is ready
+   */
+  void setActive(boolean value);
+
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigListener.java
similarity index 64%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigListener.java
index 101d9f46d..3c6488d8d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigListener.java
@@ -14,23 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.service.modules.core;
+
+package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
 
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
 import org.eclipse.jgit.diff.DiffEntry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
-import javax.inject.Inject;
-import javax.inject.Singleton;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
@@ -38,59 +40,41 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.util.PullFileLoader;
 
+
 /**
- * Service that monitors for jobs from a git repository.
- * The git repository must have an initial commit that has no config files 
since that is used as a base for getting
- * the change list.
- * The config needs to be organized with the following structure:
- * <root_config_dir>/<flowGroup>/<flowName>.(pull|job|json|conf)
- * The <flowGroup> and <flowName> is used to generate the URI used to store 
the config in the {@link FlowCatalog}
+ * Listener for {@link GitConfigMonitor} to apply changes from Git to a {@link 
FlowCatalog} for adding and removing jobs
  */
 @Slf4j
-@Singleton
-public class GitConfigMonitor extends GitMonitoringService {
-  public static final String GIT_CONFIG_MONITOR_PREFIX = 
"gobblin.service.gitConfigMonitor";
-
+public class GitConfigListener implements GitDiffListener {
+  private static final int CONFIG_FILE_DEPTH = 3;
   private static final String SPEC_DESCRIPTION = "Git-based flow config";
   private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
-  private static final String PROPERTIES_EXTENSIONS = "pull,job";
-  private static final String CONF_EXTENSIONS = "json,conf";
-  private static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = 
"git-flow-config";
-  private static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = 
"gobblin-config";
-  private static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = 
"master";
-
-  private static final int CONFIG_FILE_DEPTH = 3;
-  private static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
-
-  private static final Config DEFAULT_FALLBACK =
-      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
-          .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, 
DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR)
-          .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, 
DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
-          .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, 
DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
-          .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 
DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
-          .put(GitMonitoringService.JAVA_PROPS_EXTENSIONS, 
PROPERTIES_EXTENSIONS)
-          .put(GitMonitoringService.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
-          .build());
-
   private final FlowCatalog flowCatalog;
+
+  final String repositoryDir;
+  final String configBaseFolderName;
+  final PullFileLoader pullFileLoader;
+  final Set<String> javaPropsExtensions;
+  final Set<String> hoconFileExtensions;
   private final Config emptyConfig = ConfigFactory.empty();
 
-  @Inject
-  GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
-    
super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
-    this.flowCatalog = flowCatalog;
-  }
 
-  @Override
-  public boolean shouldPollGit() {
-    // if not active or if the flow catalog is not up yet then can't process 
config changes
-    if (!isActive || !this.flowCatalog.isRunning()) {
-      log.warn("GitConfigMonitor: skip poll since the JobCatalog is not yet 
running. isActive = {}", this.isActive);
-      return false;
+  public GitConfigListener(FlowCatalog flowCatalog, String repositoryDir, 
String configBaseFolderName, String javaPropsExtentions, String 
hoconFileExtentions) {
+    this.flowCatalog = flowCatalog;
+    this.configBaseFolderName = configBaseFolderName;
+    this.repositoryDir = repositoryDir;
+
+    Path folderPath = new Path(repositoryDir, configBaseFolderName);
+    this.javaPropsExtensions = Sets.newHashSet(javaPropsExtentions.split(","));
+    this.hoconFileExtensions = Sets.newHashSet(hoconFileExtentions.split(","));
+    try {
+      this.pullFileLoader = new PullFileLoader(folderPath,
+          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new 
Configuration()),
+          this.javaPropsExtensions, this.hoconFileExtensions);
+    } catch (IOException e) {
+      throw new RuntimeException("Could not create pull file loader", e);
     }
-    return true;
   }
-
   /**
    * Add a {@link FlowSpec} for an added, updated, or modified flow config
    * @param change
@@ -137,23 +121,23 @@ public class GitConfigMonitor extends 
GitMonitoringService {
           .withDescription(SPEC_DESCRIPTION)
           .build();
 
-        this.flowCatalog.remove(spec.getUri());
+      this.flowCatalog.remove(spec.getUri());
     }
   }
 
 
-    /**
-     * check whether the file has the proper naming and hierarchy
-     * @param configFilePath the relative path from the repo root
-     * @return false if the file does not conform
-     */
+  /**
+   * check whether the file has the proper naming and hierarchy
+   * @param configFilePath the relative path from the repo root
+   * @return false if the file does not conform
+   */
   private boolean checkConfigFilePath(String configFilePath) {
     // The config needs to stored at 
configDir/flowGroup/flowName.(pull|job|json|conf)
     Path configFile = new Path(configFilePath);
     String fileExtension = Files.getFileExtension(configFile.getName());
 
     if (configFile.depth() != CONFIG_FILE_DEPTH
-        || !configFile.getParent().getParent().getName().equals(folderName)
+        || 
!configFile.getParent().getParent().getName().equals(configBaseFolderName)
         || 
!(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension)
         || 
PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension)))
 {
       log.warn("Changed file does not conform to directory structure and file 
name format, skipping: "
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
new file mode 100644
index 000000000..e7f639ce8
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+/**
+ * Service that monitors for jobs from a git repository.
+ * The git repository must have an initial commit that has no config files 
since that is used as a base for getting
+ * the change list.
+ * The config needs to be organized with the following structure:
+ * <root_config_dir>/<flowGroup>/<flowName>.(pull|job|json|conf)
+ * The <flowGroup> and <flowName> is used to generate the URI used to store 
the config in the {@link FlowCatalog}
+ */
+@Slf4j
+@Singleton
+public class GitConfigMonitor extends GitMonitoringService {
+  public static final String GIT_CONFIG_MONITOR_PREFIX = 
"gobblin.service.gitConfigMonitor";
+
+  private static final String PROPERTIES_EXTENSIONS = "pull,job";
+  private static final String CONF_EXTENSIONS = "json,conf";
+  private static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = 
"git-flow-config";
+  private static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = 
"gobblin-config";
+  private static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = 
"master";
+
+  private static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, 
DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR)
+          .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, 
DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
+          .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, 
DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
+          .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 
DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
+          .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+          .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+          .build());
+
+  private final FlowCatalog flowCatalog;
+
+  @Inject
+  GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
+    
super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+    this.flowCatalog = flowCatalog;
+    Config configWithFallbacks = 
config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+    this.listeners.add(new GitConfigListener(flowCatalog, 
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
+        
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), 
configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
+        
configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS)));
+  }
+
+  @Override
+  public boolean shouldPollGit() {
+    // if not active or if the flow catalog is not up yet then can't process 
config changes
+    if (!isActive || !this.flowCatalog.isRunning()) {
+      log.warn("GitConfigMonitor: skip poll since the JobCatalog is not yet 
running. isActive = {}", this.isActive);
+      return false;
+    }
+    return true;
+  }
+
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitDiffListener.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitDiffListener.java
new file mode 100644
index 000000000..bfb098b35
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitDiffListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.eclipse.jgit.diff.DiffEntry;
+
+
+/**
+ * Listener for {@link GitMonitoringService} to apply changes detected from 
Git.
+ */
+public interface GitDiffListener {
+
+  void addChange(DiffEntry change);
+
+  void removeChange(DiffEntry change);
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
new file mode 100644
index 000000000..1f5a16e7d
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.eclipse.jgit.diff.DiffEntry;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+
+
+/**
+ * Listener for {@link GitFlowGraphMonitor} to apply changes from Git to a 
{@link FlowGraph}
+ */
+public class GitFlowGraphListener extends BaseFlowGraphListener implements 
GitDiffListener {
+
+  public GitFlowGraphListener(Optional<? extends FSFlowTemplateCatalog> 
flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String 
baseDirectory, String folderName, String javaPropsExtentions,
+      String hoconFileExtentions) {
+    super(flowTemplateCatalog, graph, topologySpecMap, baseDirectory, 
folderName, javaPropsExtentions, hoconFileExtentions);
+
+  }
+
+  /**
+   * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
+   * the {@link FlowGraph} for an added, updated or modified node or edge file.
+   * @param change
+   */
+  @Override
+  public void addChange(DiffEntry change) {
+    Path path = new Path(change.getNewPath());
+    if (path.depth() == NODE_FILE_DEPTH) {
+      addDataNode(change.getNewPath());
+    } else if (path.depth() == EDGE_FILE_DEPTH) {
+      addFlowEdge(change.getNewPath());
+    }
+  }
+
+  /**
+   * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge} 
from the {@link FlowGraph} for
+   * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
+   * @param change
+   */
+  @Override
+  public void removeChange(DiffEntry change) {
+    Path path = new Path(change.getOldPath());
+    if (path.depth() == NODE_FILE_DEPTH) {
+      removeDataNode(change.getOldPath());
+    } else if (path.depth() == EDGE_FILE_DEPTH) {
+      removeFlowEdge(change.getOldPath());
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
new file mode 100644
index 000000000..bfe811fa0
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.diff.DiffEntry;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+
+/**
+ * Service that monitors for changes to {@link 
org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
+ * The git repository must have an inital commit that has no files since that 
is used as a base for getting
+ * the change list.
+ * The {@link DataNode}s and {@link FlowEdge}s in FlowGraph need to be 
organized with the following directory structure on git:
+ * <root_flowGraph_dir>/<nodeName>/<nodeName>.properties
+ * <root_flowGraph_dir>/<nodeName1>/<nodeName2>/<edgeName>.properties
+ */
+@Slf4j
+public class GitFlowGraphMonitor extends GitMonitoringService implements 
FlowGraphMonitor {
+  public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = 
"gobblin.service.gitFlowGraphMonitor";
+  private static final String PROPERTIES_EXTENSIONS = "properties";
+  private static final String CONF_EXTENSIONS = "conf";
+  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = 
"git-flowgraph";
+  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = 
"gobblin-flowgraph";
+  private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = 
"master";
+  static final String SHOULD_CHECKPOINT_HASHES = "shouldCheckpointHashes";
+
+  private static final int DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL = 60;
+
+  private static final Config DEFAULT_FALLBACK = 
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+      .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
+      .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+      .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
+      .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 
DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
+      .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+      .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+      .put(SHOULD_CHECKPOINT_HASHES, false)
+      .build());
+
+  private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
+  private final CountDownLatch initComplete;
+
+  public GitFlowGraphMonitor(Config config, Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch 
initComplete) {
+    
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+    Config configWithFallbacks = 
config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+    this.flowTemplateCatalog = flowTemplateCatalog;
+    this.initComplete = initComplete;
+    this.listeners.add(new GitFlowGraphListener(
+        flowTemplateCatalog, graph, topologySpecMap, 
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
+        
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR), 
configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
+        configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS))
+    );
+  }
+
+  /**
+   * Determine if the service should poll Git. Current behavior is both 
leaders and followers(s) will poll Git for
+   * changes to {@link FlowGraph}.
+   */
+  @Override
+  public boolean shouldPollGit() {
+    return this.isActive;
+  }
+
+  /**
+   * Sort the changes in a commit so that changes to node files appear before 
changes to edge files. This is done so that
+   * node related changes are applied to the FlowGraph before edge related 
changes. An example where the order matters
+   * is the case when a commit adds a new node n2 as well as adds an edge from 
an existing node n1 to n2. To ensure that the
+   * addition of edge n1->n2 is successful, node n2 must exist in the graph 
and so needs to be added first. For deletions,
+   * the order does not matter and ordering the changes in the commit will 
result in the same FlowGraph state as if the changes
+   * were unordered. In other words, deletion of a node deletes all its 
incident edges from the FlowGraph. So processing an
+   * edge deletion later results in a no-op. Note that node and edge files do 
not change depth in case of modifications.
+   *
+   * If there are multiple commits between successive polls to Git, the 
re-ordering of changes across commits should not
+   * affect the final state of the FlowGraph. This is because, the order of 
changes for a given file type (i.e. node or edge)
+   * is preserved.
+   */
+  @Override
+  void processGitConfigChanges() throws GitAPIException, IOException {
+    if (flowTemplateCatalog.isPresent() && 
flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
+      log.info("Change to template catalog detected, refreshing FlowGraph");
+      this.gitRepo.initRepository();
+    }
+
+    List<DiffEntry> changes = this.gitRepo.getChanges();
+    changes.sort(new GitFlowgraphComparator());
+    processGitConfigChangesHelper(changes);
+    //Decrements the latch count. The countdown latch is initialized to 1. So 
after the first time the latch is decremented,
+    // the following operation should be a no-op.
+    this.initComplete.countDown();
+  }
+
+  static class GitFlowgraphComparator implements Comparator<DiffEntry>, 
Serializable {
+    public int compare(DiffEntry o1, DiffEntry o2) {
+      Integer o1Depth = (o1.getNewPath() != null) ? (new 
Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
+      Integer o2Depth = (o2.getNewPath() != null) ? (new 
Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth();
+      return o1Depth.compareTo(o2Depth);
+    }
+  }
+
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitMonitoringService.java
similarity index 91%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitMonitoringService.java
index 05d7b9a6c..878d432e5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitMonitoringService.java
@@ -15,26 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.monitoring;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.eclipse.jgit.api.Git;
 import org.eclipse.jgit.api.ResetCommand;
 import org.eclipse.jgit.api.TransportConfigCallback;
@@ -53,14 +49,13 @@ import 
org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
 import org.eclipse.jgit.treewalk.CanonicalTreeParser;
 import org.eclipse.jgit.util.FS;
 
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
 import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractIdleService;
 import com.jcraft.jsch.JSch;
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.Session;
@@ -73,16 +68,19 @@ import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.PullFileLoader;
 
 
+/**
+ * Base monitoring service that polls git and applies changes to each of its 
listeners
+ * Implementation classes should also define {@link GitDiffListener} to apply 
changes from git
+ * It is possible to have multiple listeners to the same repository, and each 
can apply their own changes, but
+ * determining if a change is made and their order is centrally controlled by 
processGitConfigChanges()
+ */
 @Slf4j
 public abstract class GitMonitoringService extends AbstractIdleService {
   private static final String REMOTE_NAME = "origin";
   private static final int TERMINATION_TIMEOUT = 30;
 
-  static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
-  static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
   static final String SHOULD_CHECKPOINT_HASHES = "shouldCheckpointHashes";
 
   private final Integer pollingInterval;
@@ -97,12 +95,10 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
   private String knownHostsFile;
 
   final GitMonitoringService.GitRepository gitRepo;
-  final String repositoryDir;
-  final String folderName;
-  final PullFileLoader pullFileLoader;
-  final Set<String> javaPropsExtensions;
-  final Set<String> hoconFileExtensions;
+  protected final String repositoryDir;
+  protected final String folderName;
 
+  protected List<GitDiffListener> listeners = new ArrayList<>();
   protected volatile boolean isActive = false;
 
   GitMonitoringService(Config config) {
@@ -168,17 +164,6 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
       throw new RuntimeException("Could not open git repository", e);
     }
 
-    Path folderPath = new Path(this.repositoryDir, this.folderName);
-    this.javaPropsExtensions = 
Sets.newHashSet(config.getString(JAVA_PROPS_EXTENSIONS).split(","));
-    this.hoconFileExtensions = 
Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
-    try {
-      this.pullFileLoader = new PullFileLoader(folderPath,
-          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new 
Configuration()),
-          this.javaPropsExtensions, this.hoconFileExtensions);
-    } catch (IOException e) {
-      throw new RuntimeException("Could not create pull file loader", e);
-    }
-
     this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("FetchGitConfExecutor")));
   }
@@ -234,20 +219,22 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
    */
   void processGitConfigChangesHelper(List<DiffEntry> changes) throws 
IOException {
     for (DiffEntry change : changes) {
-      switch (change.getChangeType()) {
-        case ADD:
-        case MODIFY:
-          addChange(change);
-          break;
-        case DELETE:
-          removeChange(change);
-          break;
-        case RENAME:
-          removeChange(change);
-          addChange(change);
-          break;
-        default:
-          throw new RuntimeException("Unsupported change type " + 
change.getChangeType());
+      for (GitDiffListener listener: this.listeners) {
+        switch (change.getChangeType()) {
+          case ADD:
+          case MODIFY:
+            listener.addChange(change);
+            break;
+          case DELETE:
+            listener.removeChange(change);
+            break;
+          case RENAME:
+            listener.removeChange(change);
+            listener.addChange(change);
+            break;
+          default:
+            throw new RuntimeException("Unsupported change type " + 
change.getChangeType());
+        }
       }
     }
 
@@ -487,8 +474,4 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
   }
 
   public abstract boolean shouldPollGit();
-
-  public abstract void addChange(DiffEntry change);
-
-  public abstract void removeChange(DiffEntry change);
 }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index baac020fe..a66717c56 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -60,7 +60,7 @@ import 
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.service.modules.core.GitConfigMonitor;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 7e675495d..0f5a66753 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -75,7 +75,7 @@ import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java
similarity index 99%
rename from 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java
index 3fddd4aab..7b735a63a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.monitoring;
 
 import java.io.File;
 import java.io.IOException;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
similarity index 99%
rename from 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
rename to 
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
index 501b4bcf0..efa34e1ec 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.monitoring;
 
 import java.io.File;
 import java.io.IOException;
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
index 94f7d151e..6fcb44374 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
@@ -161,7 +161,7 @@ public class PathAlterationObserver {
   }
 
   /**
-   * Check whether the file and its chlidren have been created, modified or 
deleted.
+   * Check whether the file and its children have been created, modified or 
deleted.
    */
   public void checkAndNotify()
       throws IOException {

Reply via email to