[ 
https://issues.apache.org/jira/browse/GOBBLIN-1696?focusedWorklogId=806898&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-806898
 ]

ASF GitHub Bot logged work on GOBBLIN-1696:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Sep/22 06:33
            Start Date: 08/Sep/22 06:33
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3548:
URL: https://github.com/apache/gobblin/pull/3548#discussion_r965504189


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java:
##########
@@ -237,6 +237,24 @@ public Set<FlowEdge> getEdges(DataNode node) {
     }
   }
 
+  @Override
+  public void copyGraph(FlowGraph graph) {

Review Comment:
   nit: this feels less making a copy (of the current instance), but 
overwriting the current instance with another.  so maybe name `overwriteWith`.
   
   thus renamed, it calls attention to the shared mutable state... which I 
suggest reconsidering, or at least clearly justifying in a comment (alternative 
sketched below)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java:
##########
@@ -95,15 +95,15 @@ public BaseFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTempl
    * to instantiate a {@link DataNode} from the node config file.
    * @param path of node to add
    */
-  protected void addDataNode(String path) {
+  protected void addDataNode(FlowGraph graph, String path) {

Review Comment:
   I see there's still a `this.flowGraph` field in this object.  when would 
that differ from the `graph` param and/or would it ever be the same?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java:
##########
@@ -237,6 +237,24 @@ public Set<FlowEdge> getEdges(DataNode node) {
     }
   }
 
+  @Override
+  public void copyGraph(FlowGraph graph) {
+    if (graph instanceof BaseFlowGraph) {
+      BaseFlowGraph baseFlowGraph = (BaseFlowGraph) graph;
+      try {
+        rwLock.writeLock().lock();
+        this.dataNodeAliasMap = baseFlowGraph.dataNodeAliasMap;
+        this.flowEdgeMap = baseFlowGraph.flowEdgeMap;
+        this.dataNodeMap = baseFlowGraph.dataNodeMap;
+        this.nodesToEdges = baseFlowGraph.nodesToEdges;
+      } finally {
+        rwLock.writeLock().unlock();
+      }
+    } else {
+      throw new UnsupportedOperationException("BaseFlowGraph can only clone 
other instances of BaseFlowGraph");

Review Comment:
   suggest error to mention which class it was, when not `BaseFlowGraph`



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationListener.java:
##########
@@ -43,4 +43,6 @@ public interface PathAlterationListener {
   void onDirectoryDelete(final Path directory);
 
   void onFileDelete(final Path path);
+
+  void onCheckDetectedChange();

Review Comment:
   javadoc would clarify... it's not immediately intuitive what "check detected 
change" is, in the way that "directory delete" is familiar.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Optional;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class FSPathAlterationFlowGraphListener extends BaseFlowGraphListener 
implements PathAlterationListener {
+
+  private File graphDir;
+  CountDownLatch initComplete;
+
+  public FSPathAlterationFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String 
baseDirectory, String flowGraphFolderName,
+      String javaPropsExtentions, String hoconFileExtensions, CountDownLatch 
initComplete) {
+    super(flowTemplateCatalog, graph, topologySpecMap, baseDirectory, 
flowGraphFolderName, javaPropsExtentions, hoconFileExtensions);
+    this.graphDir = new File(baseDirectory);
+    this.initComplete = initComplete;
+    // Populate the flowgraph with any existing files
+    if (!this.graphDir.exists()) {
+      throw new RuntimeException(String.format("Flowgraph directory at path %s 
does not exist!", graphDir));
+    }
+    this.populateFlowGraphAtomically();
+  }
+
+  public void onStart(final PathAlterationObserver observer) {
+  }
+
+  public void onFileCreate(final Path path) {
+  }
+
+  public void onFileChange(final Path path) {
+  }
+
+  public void onStop(final PathAlterationObserver observer) {
+  }
+
+  public void onDirectoryCreate(final Path directory) {
+  }
+
+  public void onDirectoryChange(final Path directory) {
+  }
+
+  public void onDirectoryDelete(final Path directory) {
+  }
+
+  public void onFileDelete(final Path path) {
+  }
+
+  @Override
+  public void onCheckDetectedChange() {
+    log.info("Detecting change in flowgraph files, reloading flowgraph");
+    this.populateFlowGraphAtomically();
+  }
+
+  private void populateFlowGraphAtomically() {
+    FlowGraph newFlowGraph = new BaseFlowGraph();
+    try {
+      List<Path> edges = new ArrayList<>();
+      // All nodes must be added first before edges, otherwise edges may have 
a missing source or destination.
+      Files.walk(this.graphDir.toPath()).forEach(fileName -> {
+        if (!Files.isDirectory(fileName)) {
+          if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), 
NODE_FILE_DEPTH)) {
+            addDataNode(newFlowGraph, fileName.toString());
+          } else if (checkFileLevelRelativeToRoot(new 
Path(fileName.toString()), EDGE_FILE_DEPTH)) {
+            edges.add(new Path(fileName.toString()));
+          }
+        }
+      });
+      for (Path edge: edges) {
+        addFlowEdge(newFlowGraph, edge.toString());

Review Comment:
   why turn a string into a `Path` only to convert it back `toString()` the 
only place it's used?  if a reason clarify in a comment.



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java:
##########
@@ -187,6 +186,14 @@ public void checkAndNotify()
     for (final PathAlterationListener listener : listeners.values()) {
       listener.onStop(this);
     }
+
+    if (this.changeApplied) {
+      for (final PathAlterationListener listener : listeners.values()) {
+        // Fire onCheckDetectedChange to notify when one check contains any 
number of changes
+        listener.onCheckDetectedChange();
+      }
+    }
+    this.changeApplied = false;

Review Comment:
   unless a semantic difference, I suggest to init to false as the first line 
of the method, rather than the last (which I believe to be leaving it for the 
next subsequent invocation).



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class FsFlowGraphMonitorTest {
+  private static final Logger logger = 
LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+  private final File TEST_DIR = new File(FileUtils.getTempDirectory(), 
"fsFlowGraphTestDir");
+  private final File flowGraphDir = new File(TEST_DIR, "/gobblin-flowgraph");
+  private static final String NODE_1_FILE = "node1.properties";
+  private final File node1Dir = new File(FileUtils.getTempDirectory(), 
"node1");
+  private final File node1File = new File(node1Dir, NODE_1_FILE);
+  private static final String NODE_2_FILE = "node2.properties";
+  private final File node2Dir = new File(FileUtils.getTempDirectory(), 
"node2");
+  private final File node2File = new File(node2Dir, NODE_2_FILE);
+  private final File edge1Dir = new File(node1Dir, "node2");
+  private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+  private RefSpec masterRefSpec = new RefSpec("master");
+  private Optional<FSFlowTemplateCatalog> flowCatalog;
+  private Config config;
+  private BaseFlowGraph flowGraph;
+  private FsFlowGraphMonitor flowGraphMonitor;
+  private Map<URI, TopologySpec> topologySpecMap;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(TEST_DIR.toString());
+
+
+    URI topologyCatalogUri = 
this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+    this.topologySpecMap = 
MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
+
+    this.config = ConfigBuilder.create()
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
+            + ConfigurationKeys.FLOWGRAPH_REPO_DIR, 
this.flowGraphDir.getAbsolutePath())
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 2)
+        .build();
+
+    // Create a FSFlowTemplateCatalog instance
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    this.flowGraphDir.mkdirs();
+    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    this.flowCatalog = Optional.of(new 
FSFlowTemplateCatalog(templateCatalogCfg));
+
+    //Create a FlowGraph instance with defaults
+    this.flowGraph = new BaseFlowGraph();
+
+    this.flowGraphMonitor = new FsFlowGraphMonitor(this.config, 
this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1));
+    this.flowGraphMonitor.startUp();
+    this.flowGraphMonitor.setActive(true);
+  }
+
+  @Test
+  public void testAddNode() throws Exception {
+    String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY 
+ "=true\nparam1=value1\n";
+    String file2Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY 
+ "=true\nparam2=value2\n";
+
+    addNode(this.node1Dir, this.node1File, file1Contents);
+    addNode(this.node2Dir, this.node2File, file2Contents);
+
+    // Let the monitor pick up the nodes that were recently added
+    Thread.sleep(3000);
+    for (int i = 0; i < 1; i++) {

Review Comment:
   apologies if I'm being dense, but how do you verify `"param2=value2\n"`, 
amidst `i < 1` as the looping condition?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class FsFlowGraphMonitorTest {
+  private static final Logger logger = 
LoggerFactory.getLogger(GitFlowGraphMonitor.class);

Review Comment:
   spoof! ;p



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -1020,8 +1020,14 @@ public class ConfigurationKeys {
    * Configuration properties related to flowGraphs
    */
 
-  public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
-  public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+  public static final String JAVA_PROPS_EXTENSIONS = 
"flowGraph.javaPropsExtensions";
+  public static final String HOCON_FILE_EXTENSIONS = 
"flowGraph.hoconFileExtensions";

Review Comment:
   must these be mutually-exclusive?  is the format comma-separated?  may help 
to document...



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/ExceptionCatchingPathAlterationListenerDecorator.java:
##########
@@ -111,4 +111,13 @@ public void onFileDelete(Path path) {
       log.error("onFileDelete failure: ", exc);
     }
   }
+
+  @Override
+  public void onCheckDetectedChange() {
+    try {
+      this.underlying.onCheckDetectedChange();
+    } catch (Throwable exc) {
+      log.error("onFileDelete failure: ", exc);
+    }

Review Comment:
   tip: as the `on*()` methods don't throw checked exceptions, you could more 
succinctly implement the wrapping using a `Callable` if you define:
   ```
   protected void logSwallowedThrowable(Callable c, String methodName) { ... }
   ```
   note, I'm passing the name explicitly, but you could also use 
`Throwable.getStackTrace()` to pull out the `StackTraceElement.getMethodName()`
   
   then call as:
   ```
   public void onCheckDetectedChange() {
     logSwallowedThrowable(() -> this.underlying.onCheckDetectedChange(), 
"onCheckDetectedChange");
   }
   ```
   also, note the copy-paste slip of `"onFileDelete"` above



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Optional;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class FSPathAlterationFlowGraphListener extends BaseFlowGraphListener 
implements PathAlterationListener {
+
+  private File graphDir;
+  CountDownLatch initComplete;
+
+  public FSPathAlterationFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String 
baseDirectory, String flowGraphFolderName,
+      String javaPropsExtentions, String hoconFileExtensions, CountDownLatch 
initComplete) {
+    super(flowTemplateCatalog, graph, topologySpecMap, baseDirectory, 
flowGraphFolderName, javaPropsExtentions, hoconFileExtensions);
+    this.graphDir = new File(baseDirectory);
+    this.initComplete = initComplete;
+    // Populate the flowgraph with any existing files
+    if (!this.graphDir.exists()) {
+      throw new RuntimeException(String.format("Flowgraph directory at path %s 
does not exist!", graphDir));
+    }
+    this.populateFlowGraphAtomically();
+  }
+
+  public void onStart(final PathAlterationObserver observer) {

Review Comment:
   do these need a `@Override`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Optional;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class FSPathAlterationFlowGraphListener extends BaseFlowGraphListener 
implements PathAlterationListener {

Review Comment:
   I strongly encourage class-level javadoc--usually method-level too.
   
   e.g. I'm curious how this relates to the `FsFlowGraphMonitor` and would hope 
to gain insight by comparing the class descs.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Optional;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.hadoop.fs.Path;
+
+@Slf4j
+public class FSPathAlterationFlowGraphListener extends BaseFlowGraphListener 
implements PathAlterationListener {
+
+  private File graphDir;
+  CountDownLatch initComplete;
+
+  public FSPathAlterationFlowGraphListener(Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String 
baseDirectory, String flowGraphFolderName,
+      String javaPropsExtentions, String hoconFileExtensions, CountDownLatch 
initComplete) {
+    super(flowTemplateCatalog, graph, topologySpecMap, baseDirectory, 
flowGraphFolderName, javaPropsExtentions, hoconFileExtensions);
+    this.graphDir = new File(baseDirectory);
+    this.initComplete = initComplete;
+    // Populate the flowgraph with any existing files
+    if (!this.graphDir.exists()) {
+      throw new RuntimeException(String.format("Flowgraph directory at path %s 
does not exist!", graphDir));
+    }
+    this.populateFlowGraphAtomically();
+  }
+
+  public void onStart(final PathAlterationObserver observer) {
+  }
+
+  public void onFileCreate(final Path path) {
+  }
+
+  public void onFileChange(final Path path) {
+  }
+
+  public void onStop(final PathAlterationObserver observer) {
+  }
+
+  public void onDirectoryCreate(final Path directory) {
+  }
+
+  public void onDirectoryChange(final Path directory) {
+  }
+
+  public void onDirectoryDelete(final Path directory) {
+  }
+
+  public void onFileDelete(final Path path) {
+  }
+
+  @Override
+  public void onCheckDetectedChange() {
+    log.info("Detecting change in flowgraph files, reloading flowgraph");
+    this.populateFlowGraphAtomically();
+  }
+
+  private void populateFlowGraphAtomically() {
+    FlowGraph newFlowGraph = new BaseFlowGraph();
+    try {
+      List<Path> edges = new ArrayList<>();
+      // All nodes must be added first before edges, otherwise edges may have 
a missing source or destination.
+      Files.walk(this.graphDir.toPath()).forEach(fileName -> {
+        if (!Files.isDirectory(fileName)) {
+          if (checkFileLevelRelativeToRoot(new Path(fileName.toString()), 
NODE_FILE_DEPTH)) {
+            addDataNode(newFlowGraph, fileName.toString());
+          } else if (checkFileLevelRelativeToRoot(new 
Path(fileName.toString()), EDGE_FILE_DEPTH)) {
+            edges.add(new Path(fileName.toString()));
+          }
+        }
+      });
+      for (Path edge: edges) {
+        addFlowEdge(newFlowGraph, edge.toString());
+      }
+      // Reduce the countdown latch
+      this.initComplete.countDown();
+      this.flowGraph.copyGraph(newFlowGraph);

Review Comment:
   the pattern seems to be: prepare another flow graph then use its state to 
reinitialize the active flow graph. why not instead just make the new flow 
graph itself active?  basically an atomic reference swap.  that approach would 
allow flow graphs to be immutable, which is generally a Good Thing: there's no 
need for locking and any in-flight reads continue against the one they started 
with, even if a different FG becomes active in the meanwhile.



##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java:
##########
@@ -43,9 +43,9 @@ public class PathAlterationObserver {
   private final PathFilter pathFilter;
   private final Comparator<Path> comparator;
   private final FileSystem fs;
-
   private final Path[] EMPTY_PATH_ARRAY = new Path[0];
 
+  private boolean changeApplied = false;

Review Comment:
   since only `checkAndNotify` is `synchronized`, not the others setting this, 
it should be `volatile`.
   
   that said, the "check-then-act" of `if (this.changeApplied) {...}` may  be 
unsafe until those other methods actually are `synchronized`.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import 
org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
+
+
+@Slf4j
+public class FsFlowGraphMonitor extends AbstractIdleService implements 
FlowGraphMonitor {
+  public static final String FS_FLOWGRAPH_MONITOR_PREFIX = 
"gobblin.service.fsFlowGraphMonitor";
+  private static long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR = 
"git-flowgraph";
+  private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = 
"gobblin-flowgraph";
+  private boolean isActive;
+  private long pollingInterval;
+  private PathAlterationObserverScheduler pathAlterationDetector;
+  private PathAlterationObserver observer;
+  private Path flowGraphPath;
+
+  private static final Config DEFAULT_FALLBACK = 
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+      .put(ConfigurationKeys.FLOWGRAPH_REPO_DIR, 
DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR, 
DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+      .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 
DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
+      .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, 
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+      .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, 
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+      .build());
+
+  public FsFlowGraphMonitor(Config config, Optional<? extends 
FSFlowTemplateCatalog> flowTemplateCatalog,
+      FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch 
initComplete) throws IOException {
+    Config configWithFallbacks = 
config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+    this.pollingInterval = 
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
+    this.flowGraphPath = new 
Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_REPO_DIR));
+    this.observer = new PathAlterationObserver(flowGraphPath);

Review Comment:
   not 100% sure here... may deserve a comment... is this some legacy 
compatability dual mode of supporting the git repo in addition, so either may 
be used?
   
   if so, consider a config to disable that once we're ready to move entirely 
toward the FS dir





Issue Time Tracking
-------------------

    Worklog Id:     (was: 806898)
    Time Spent: 0.5h  (was: 20m)

> Build a file-based flowgraph that watches for changes and updates
> -----------------------------------------------------------------
>
>                 Key: GOBBLIN-1696
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1696
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Gobblin-as-a-Service only has a Git based flowgraph, which is difficult to 
> build CI/CD around. We can provide an alternate flowgraph that is just based 
> off files. This flowgraph should update atomically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to