[ 
https://issues.apache.org/jira/browse/GOBBLIN-1696?focusedWorklogId=811077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-811077
 ]

ASF GitHub Bot logged work on GOBBLIN-1696:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Sep/22 07:45
            Start Date: 22/Sep/22 07:45
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3548:
URL: https://github.com/apache/gobblin/pull/3548#discussion_r977194076


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java:
##########
@@ -82,7 +85,7 @@
 @Slf4j
 public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
   @Getter
-  private final FlowGraph flowGraph;
+  private final AtomicReference<FlowGraph> flowGraph;

Review Comment:
   `@Getter` may break encapsulation more than desired: let's not invite 
callers to adjust our reference.  better perhaps to define an accessor 
returning `AtomicReference.get` to them.
   
   in order to handle atomic updates, define a setter (yourself, `@Setter` 
won't quite work)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java:
##########
@@ -168,7 +171,7 @@ public MultiHopFlowCompiler(Config config, Optional<Logger> 
log, boolean instrum
   }
 
   @VisibleForTesting
-  MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
+  MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {

Review Comment:
   similar to comments above around encapsulation, more expected would be a 
`FlowGraph` param for the MHFC to store internally into the `AtomicReference`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -277,32 +282,64 @@ private List<SpecExecutor> getSpecExecutors(Config 
edgeConfig) throws URISyntaxE
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadNodeFileWithOverrides(Path filePath) throws IOException 
{
+  protected Config loadNodeFileWithOverrides(Path filePath)
+      throws IOException {
     Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getNodeConfigWithOverrides(nodeConfig, filePath);
   }
 
-
   /**
    * Load the edge file.
    * @param filePath path of the edge file relative to the repository root
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException 
{
+  protected Config loadEdgeFileWithOverrides(Path filePath)
+      throws IOException {
     Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getEdgeConfigWithOverrides(edgeConfig, filePath);
   }
 
+  /**
+   * Loads the entire flowgraph from the path configured in {@link 
org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+   * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+   * Expects edges to be in the format of 
/flowGraphName/nodeA/nodeB/edgeAB.properties
+   * The current flowgraph will be swapped atomically with the new flowgraph 
that is loaded
+   */
+  public void populateFlowGraphAtomically(AtomicReference<FlowGraph> 
flowGraphReference) {
+    FlowGraph newFlowGraph = new BaseFlowGraph();
+    java.nio.file.Path graphPath = new File(this.baseDirectory).toPath();
+    try {
+      List<Path> edges = new ArrayList<>();
+      // All nodes must be added first before edges, otherwise edges may have 
a missing source or destination.
+      // Need to convert files to Hadoop Paths to be compatible with 
FileAlterationListener
+      java.nio.file.Files.walk(graphPath).forEach(fileName -> {
+        if (!java.nio.file.Files.isDirectory(fileName)) {
+          if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), 
NODE_FILE_DEPTH)) {
+            addDataNode(newFlowGraph, fileName.toString());
+          } else if (checkFileLevelRelativeToRoot(new 
Path(fileName.toString()), EDGE_FILE_DEPTH)) {
+            edges.add(new Path(fileName.toString()));
+          }

Review Comment:
   I wonder what might be silently skipped here... whether logging might assist 
in debugging violated presumptions



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -277,32 +282,64 @@ private List<SpecExecutor> getSpecExecutors(Config 
edgeConfig) throws URISyntaxE
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadNodeFileWithOverrides(Path filePath) throws IOException 
{
+  protected Config loadNodeFileWithOverrides(Path filePath)
+      throws IOException {
     Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getNodeConfigWithOverrides(nodeConfig, filePath);
   }
 
-
   /**
    * Load the edge file.
    * @param filePath path of the edge file relative to the repository root
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException 
{
+  protected Config loadEdgeFileWithOverrides(Path filePath)
+      throws IOException {
     Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getEdgeConfigWithOverrides(edgeConfig, filePath);
   }
 
+  /**
+   * Loads the entire flowgraph from the path configured in {@link 
org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+   * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+   * Expects edges to be in the format of 
/flowGraphName/nodeA/nodeB/edgeAB.properties
+   * The current flowgraph will be swapped atomically with the new flowgraph 
that is loaded
+   */
+  public void populateFlowGraphAtomically(AtomicReference<FlowGraph> 
flowGraphReference) {

Review Comment:
   rather than "handing over the keys", and combining responsibility for 
loading w/ managing orderly updates, why not merely return the loaded 
`FlowGraph` and leave it to the caller to handle update/replacement?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -84,26 +84,27 @@ public BaseFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTempl
     this.hoconFileExtensions = Sets.newHashSet(hoconFileExtensions.split(","));
     try {
       this.pullFileLoader = new PullFileLoader(folderPath,
-          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new 
Configuration()),
-          this.javaPropsExtensions, this.hoconFileExtensions);
+          FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new 
Configuration()), this.javaPropsExtensions,
+          this.hoconFileExtensions);
     } catch (IOException e) {
       throw new RuntimeException("Could not create pull file loader", e);
     }
   }
+
   /**
    * Add a {@link DataNode} to the {@link FlowGraph}. The method uses the 
{@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
    * to instantiate a {@link DataNode} from the node config file.
    * @param path of node to add
    */
-  protected void addDataNode(String path) {
+   public void addDataNode(FlowGraph graph, String path) {
     if (checkFilePath(path, NODE_FILE_DEPTH)) {
       Path nodeFilePath = new Path(this.baseDirectory, path);
       try {
         Config config = loadNodeFileWithOverrides(nodeFilePath);
         Class dataNodeClass = Class.forName(ConfigUtils.getString(config, 
FlowGraphConfigurationKeys.DATA_NODE_CLASS,
             FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
         DataNode dataNode = (DataNode) 
GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config);
-        if (!this.flowGraph.addDataNode(dataNode)) {
+        if (!graph.addDataNode(dataNode)) {

Review Comment:
   overall I don't want to slow down the refactoring, so up to you on whether 
or how to address...
   
   this helper's methods all follow the same pattern, which appears a clunky 
abstraction: they convert a `path` into whatever argument for a call on 
`flowGraph` that they delegate to the same.  it's as if we wished the 
`FlowGraph` methods to take a `String path` as their lone arg--and have that 
magically work
   
   Since the gap boils down merely to conversion (and the `FlowGraph` itself is 
not involved there), I'd propose a thinner, more cohesive abstraction: a few 
converter/factory methods, each taking a `path` and returning a `DataNode` or a 
`FlowEdge` or a `String flowId`.
   
   impl-wise, all would leverage one core method to resolve `String path` to 
`Config`.  the latter motivates the instance state of the pull loader and the 
base dir.  that's a "path-based config resolver".
   
   separately, if we wish to support the pattern of:
   ```
     if (!flowGraph.op(arg)) {
       log("op problem w/ {}", arg.id);
     } else {
       log("it worked w/ {}", arg.id)
     }
   ```
   write a logging `FlowGraph` Decorator that wraps each such method.
   
   edit: see further comments below...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java:
##########
@@ -47,10 +47,10 @@
 public class BaseFlowGraph implements FlowGraph {
   private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);

Review Comment:
   while you're documenting the rationale for `AtomicReference` (suggested by 
zihan), let's also clarify what particular instance state this lock means to 
protect.
   
   ...at this point, I don't see non-`final` fields in the object... although I 
do note the maps are not immutable.
   
   that last part is significant: even with careful atomic access and update 
via the reference, the FG can mutate out from under whomever is using it!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
+import 
org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
+
+
+@Slf4j
+public class FsFlowGraphMonitor extends AbstractIdleService implements 
FlowGraphMonitor {
+  public static final String FS_FLOWGRAPH_MONITOR_PREFIX = 
"gobblin.service.fsFlowGraphMonitor";
+  private static final long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR = 
"git-flowgraph";
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = 
"gobblin-flowgraph";
+  private volatile boolean isActive = false;
+  private final long pollingInterval;
+  private BaseFlowGraphHelper flowGraphHelper;
+  private final PathAlterationObserverScheduler pathAlterationDetector;
+  private final FSPathAlterationFlowGraphListener listener;
+  private final PathAlterationObserver observer;
+  private final Path flowGraphPath;
+  private final AtomicReference<FlowGraph> flowGraph;
+  private final CountDownLatch initComplete;
+  private static final Config DEFAULT_FALLBACK = 
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+      .put(ConfigurationKeys.FLOWGRAPH_REPO_DIR, 
DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR, 
DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 
DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
+      .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, 
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+      .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, 
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS).build());
+
+  public FsFlowGraphMonitor(Config config, Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      AtomicReference<FlowGraph> graph, Map<URI, TopologySpec> 
topologySpecMap, CountDownLatch initComplete)
+      throws IOException {
+    Config configWithFallbacks = 
config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+    this.pollingInterval =
+        
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
+    this.flowGraphPath = new 
Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_REPO_DIR));
+    this.observer = new PathAlterationObserver(flowGraphPath);
+    this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog, 
topologySpecMap, flowGraphPath.toString(),
+        configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR), 
configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
+        
configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS));
+    this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog, 
graph, flowGraphPath.toString(), this.flowGraphHelper);
+
+   this.flowGraph = graph;
+   this.initComplete = initComplete;
+
+    if (pollingInterval == 
ConfigurationKeys.DISABLED_JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL) {
+      this.pathAlterationDetector = null;
+    } else {
+      this.pathAlterationDetector = new 
PathAlterationObserverScheduler(pollingInterval);
+      Optional<PathAlterationObserver> observerOptional = 
Optional.fromNullable(observer);
+      this.pathAlterationDetector.addPathAlterationObserver(this.listener, 
observerOptional,
+          this.flowGraphPath);
+    }
+  }
+
+  @Override
+  protected void startUp()
+      throws IOException {
+  }
+
+  @Override
+  public synchronized void setActive(boolean isActive) {
+    log.info("Setting the flow graph monitor to be " + isActive + " from " + 
this.isActive);
+    if (this.isActive == isActive) {
+      // No-op if already in correct state
+      return;
+    } else if (isActive) {
+      if (this.pathAlterationDetector != null) {
+        log.info("Starting the " + getClass().getSimpleName());
+        log.info("Polling flowgraph folder with interval {} ", 
this.pollingInterval);
+        try {
+          this.pathAlterationDetector.start();
+          // Manually instantiate flowgraph when the monitor becomes active
+          this.flowGraphHelper.populateFlowGraphAtomically(this.flowGraph);

Review Comment:
   once I read these FlowGraphMonitors, I realized why the 
`BaseFlowGraphHelper.addDataNode()` and other methods are `public`.  in 
essence, the `flowGraphHelper` unequally serves this FS monitor over the Git 
one, by providing this method only for the former, with no equivalent for the 
latter: instead similar discernment of path nesting depth on the Git side is 
within `GitFlowGraphListener`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/GitFlowGraphListener.java:
##########
@@ -53,10 +47,10 @@ public GitFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTempla
   @Override
   public void addChange(DiffEntry change) {
     Path path = new Path(change.getNewPath());
-    if (path.depth() == NODE_FILE_DEPTH) {
-      addDataNode(change.getNewPath());
-    } else if (path.depth() == EDGE_FILE_DEPTH) {
-      addFlowEdge(change.getNewPath());
+    if (path.depth() == BaseFlowGraphHelper.NODE_FILE_DEPTH) {
+      this.baseFlowGraphHelper.addDataNode(this.flowGraph.get(), 
change.getNewPath());

Review Comment:
   the code here illustrates the challenge: even though the `FlowGraph` is 
mediated by an `AtomicReference`, it remains mutable.  one irresponsible actor 
could bring ruin to everyone!
   
   ideally instead of updating the *same* flow graph, we'd rather create a 
updated *copy* of it, reflecting the differences.  
   
   both because that has cost and because we'd like to enjoy the atomic 
semantics we're working so hard for, we wouldn't do this on each 
addition/removal, but rather one time per invocation of 
`GitFlowGraphMonitor.processGitConfigChanges()`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+
+
+/**
+ * An implementation of {@link PathAlterationListener} to listen for changes 
in a directory and apply it to a GaaS FlowGraph
+ * Is invoked by {@link PathAlterationObserver} which would check a folder and 
perform recursive comparisons on files compared to
+ * their last polled state. On any detected differences in files when a check 
is done, the {@link FlowGraph} will be updated.
+ *
+ * Unlike the {@link GitFlowGraphListener}, this class will reload the entire 
flowgraph on any detected change, instead loading only the diffs.
+ */
+@Slf4j
+public class FSPathAlterationFlowGraphListener implements 
PathAlterationListener {
+  private final AtomicReference<FlowGraph> flowGraph;

Review Comment:
   I'm surprised to see another `AtomicReference` over here.  is the same one 
from the MHFC... basically entrusted over to this class?  I recommend instead 
that this listener hold a reference to the MHFC itself and invoke a setter 
method for encapsulating the `AtomicReference`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
+
+
+@Slf4j
+public class FsFlowGraphMonitor extends AbstractIdleService implements 
FlowGraphMonitor {
+  public static final String FS_FLOWGRAPH_MONITOR_PREFIX = 
"gobblin.service.fsFlowGraphMonitor";
+  private static long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR = 
"git-flowgraph";
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = 
"gobblin-flowgraph";
+  private boolean isActive;
+  private long pollingInterval;
+  private PathAlterationObserverScheduler pathAlterationDetector;
+  private PathAlterationObserver observer;
+  private Path flowGraphPath;
+
+  private static final Config DEFAULT_FALLBACK = 
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+      .put(ConfigurationKeys.FLOWGRAPH_REPO_DIR, 
DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR, 
DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 
DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
+      .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, 
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+      .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, 
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+      .build());
+
+  public FsFlowGraphMonitor(Config config, Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch 
initComplete) throws IOException {
+    Config configWithFallbacks = 
config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+    this.pollingInterval = 
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
+    this.flowGraphPath = new 
Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_REPO_DIR));
+    this.observer = new PathAlterationObserver(flowGraphPath);

Review Comment:
   yes, probably better to create config keys specific to this



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+
+
+/**
+ * An implementation of {@link PathAlterationListener} to listen for changes 
in a directory and apply it to a GaaS FlowGraph
+ * Is invoked by {@link PathAlterationObserver} which would check a folder and 
perform recursive comparisons on files compared to
+ * their last polled state. On any detected differences in files when a check 
is done, the {@link FlowGraph} will be updated.
+ *
+ * Unlike the {@link GitFlowGraphListener}, this class will reload the entire 
flowgraph on any detected change, instead loading only the diffs.

Review Comment:
   you may mean "instead *of* loading only the diffs"



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java:
##########
@@ -43,9 +43,9 @@ public class PathAlterationObserver {
   private final PathFilter pathFilter;
   private final Comparator<Path> comparator;
   private final FileSystem fs;
-
   private final Path[] EMPTY_PATH_ARRAY = new Path[0];
 
+  private boolean changeApplied = false;

Review Comment:
   that's true: if all the others are called only within `checkThenNotify`, 
which is `synchronized`, then it's not necessary to synchronize them in 
addition.  I hadn't grasped earlier whether that's the case.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -277,32 +282,64 @@ private List<SpecExecutor> getSpecExecutors(Config 
edgeConfig) throws URISyntaxE
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadNodeFileWithOverrides(Path filePath) throws IOException 
{
+  protected Config loadNodeFileWithOverrides(Path filePath)
+      throws IOException {
     Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getNodeConfigWithOverrides(nodeConfig, filePath);
   }
 
-
   /**
    * Load the edge file.
    * @param filePath path of the edge file relative to the repository root
    * @return the configuration object
    * @throws IOException
    */
-  protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException 
{
+  protected Config loadEdgeFileWithOverrides(Path filePath)
+      throws IOException {
     Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, 
emptyConfig, false, false);
     return getEdgeConfigWithOverrides(edgeConfig, filePath);
   }
 
+  /**
+   * Loads the entire flowgraph from the path configured in {@link 
org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+   * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+   * Expects edges to be in the format of 
/flowGraphName/nodeA/nodeB/edgeAB.properties
+   * The current flowgraph will be swapped atomically with the new flowgraph 
that is loaded
+   */
+  public void populateFlowGraphAtomically(AtomicReference<FlowGraph> 
flowGraphReference) {

Review Comment:
   also, I suggested earlier to retool the abstraction to avoid wrapping the 
`FlowGraph` method invocations.  seeing this method, I finally appreciate that 
it is the core abstraction--basically you've got a flow graph 
initializer/loader/factory.  I was thrown off earlier by the other methods 
being `public`, when they really seem subordinate to this one.
   
   if you made them protected or private, renamed this class, and placed this 
method first, I don't find much need to rework the abstraction.
   
   that said, this method seems very tied to the FS impl, and not relevant to 
the Git one.  if so, that might be an argument for the thinner abstraction of a 
"path-based config loader" shared between both.  this functionality here would 
be FS-specific, so not within the same class.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/GitFlowGraphListener.java:
##########
@@ -53,10 +47,10 @@ public GitFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTempla
   @Override
   public void addChange(DiffEntry change) {
     Path path = new Path(change.getNewPath());
-    if (path.depth() == NODE_FILE_DEPTH) {
-      addDataNode(change.getNewPath());
-    } else if (path.depth() == EDGE_FILE_DEPTH) {
-      addFlowEdge(change.getNewPath());
+    if (path.depth() == BaseFlowGraphHelper.NODE_FILE_DEPTH) {
+      this.baseFlowGraphHelper.addDataNode(this.flowGraph.get(), 
change.getNewPath());

Review Comment:
   the difficulty here is that the java collections lib has such weak support 
for immutable collections, specifically for sharing common parts between 
instances (consider instead the purely functional data structures of scala or 
clojure).  what this means is that the copy-on-write is more expensive... 
still, the approach nonetheless works, just "higher tax".
   
   really we have two choices:
   a. keep the flow graph itself immutable and allow it to be cloned into a 
builder.  once all adjustments are made, the builder's `.build()` renders a new 
immutable instance.
   b. have a scratch flow graph not directly used by the MHFC, that is mutable. 
 its 'add' and 'remove' methods would be `synchronzied`.  additionally have a 
`synchronized` `createSnapshot()` method that creates a copy of the current 
state, which is an immutable flow graph.
   
   





Issue Time Tracking
-------------------

    Worklog Id:     (was: 811077)
    Time Spent: 4h  (was: 3h 50m)

> Build a file-based flowgraph that watches for changes and updates
> -----------------------------------------------------------------
>
>                 Key: GOBBLIN-1696
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1696
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> Gobblin-as-a-Service only has a Git based flowgraph, which is difficult to 
> build CI/CD around. We can provide an alternate flowgraph that is just based 
> off files. This flowgraph should update atomically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to