Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2509f3a3d -> 4f2f2b3a5


[GOBBLIN-611] Ensure node events are processed before edge events in 
GitFlowGraphMonitor.[]

Closes #2477 from
sv2000/flowGraphMonitorChangeDepth


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4f2f2b3a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4f2f2b3a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4f2f2b3a

Branch: refs/heads/master
Commit: 4f2f2b3a5c3d354d6e13d91e93bda7a3ade096a9
Parents: 2509f3a
Author: sv2000 <[email protected]>
Authored: Fri Oct 12 10:19:40 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Fri Oct 12 10:19:40 2018 -0700

----------------------------------------------------------------------
 .../modules/core/GitFlowGraphMonitor.java       |  38 ++-
 .../modules/core/GitMonitoringService.java      |  15 +-
 .../modules/core/GitFlowGraphMonitorTest.java   | 268 ++++++++++++-------
 3 files changed, 215 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4f2f2b3a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 4a60d35..e12e839 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -18,9 +18,12 @@
 package org.apache.gobblin.service.modules.core;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.errors.GitAPIException;
 import org.eclipse.jgit.diff.DiffEntry;
 
 import com.google.common.base.Joiner;
@@ -95,6 +98,29 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
   }
 
   /**
+   * Sort the changes in a commit so that changes to node files appear before 
changes to edge files. This is done so that
+   * node related changes are applied to the FlowGraph before edge related 
changes. An example where the order matters
+   * is the case when a commit adds a new node n2 as well as adds an edge from 
an existing node n1 to n2. To ensure that the
+   * addition of edge n1->n2 is successful, node n2 must exist in the graph 
and so needs to be added first. For deletions,
+   * the order does not matter and ordering the changes in the commit will 
result in the same FlowGraph state as if the changes
+   * were unordered. In other words, deletion of a node deletes all its 
incident edges from the FlowGraph. So processing an
+   * edge deletion later results in a no-op. Note that node and edge files do 
not change depth in case of modifications.
+   *
+   * If there are multiple commits between successive polls to Git, the 
re-ordering of changes across commits should not
+   * affect the final state of the FlowGraph. This is because, the order of 
changes for a given file type (i.e. node or edge)
+   * is preserved.
+   */
+  @Override
+  void processGitConfigChanges() throws GitAPIException, IOException {
+    List<DiffEntry> changes = this.gitRepo.getChanges();
+    Collections.sort(changes, (o1, o2) -> {
+      Integer o1Depth = (o1.getNewPath() != null) ? (new 
Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
+      Integer o2Depth = (o2.getNewPath() != null) ? (new 
Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth();
+      return o1Depth.compareTo(o2Depth);
+    });
+    processGitConfigChangesHelper(changes);
+  }
+  /**
    * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
    * the {@link FlowGraph} for an added, updated or modified node or edge file.
    * @param change
@@ -139,6 +165,8 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
         DataNode dataNode = (DataNode) 
GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config);
         if (!this.flowGraph.addDataNode(dataNode)) {
           log.warn("Could not add DataNode {} to FlowGraph; skipping", 
dataNode.getId());
+        } else {
+          log.info("Added Datanode {} to FlowGraph", dataNode.getId());
         }
       } catch (Exception e) {
         log.warn("Could not add DataNode defined in {} due to exception {}", 
change.getNewPath(), e.getMessage());
@@ -158,6 +186,8 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
       String nodeId = 
config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY);
       if (!this.flowGraph.deleteDataNode(nodeId)) {
         log.warn("Could not remove DataNode {} from FlowGraph; skipping", 
nodeId);
+      } else {
+        log.info("Removed DataNode {} from FlowGraph", nodeId);
       }
     }
   }
@@ -178,6 +208,8 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
         FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog);
         if (!this.flowGraph.addFlowEdge(edge)) {
           log.warn("Could not add edge {} to FlowGraph; skipping", 
edge.getId());
+        } else {
+          log.info("Added edge {} to FlowGraph", edge.getId());
         }
       } catch (Exception e) {
         log.warn("Could not add edge defined in {} due to exception {}", 
change.getNewPath(), e.getMessage());
@@ -198,7 +230,9 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
         Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), 
edgeFilePath);
         String edgeId = 
config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
         if (!this.flowGraph.deleteFlowEdge(edgeId)) {
-          log.warn("Could not remove FlowEdge {} from FlowGraph; skipping", 
edgeId);
+          log.warn("Could not remove edge {} from FlowGraph; skipping", 
edgeId);
+        } else {
+          log.info("Removed edge {} from FlowGraph", edgeId);
         }
       } catch (Exception e) {
         log.warn("Could not remove edge defined in {} due to exception {}", 
edgeFilePath, e.getMessage());
@@ -234,7 +268,7 @@ public class GitFlowGraphMonitor extends 
GitMonitoringService {
    * @return true if the file conforms to the expected hierarchy
    */
   private boolean checkFileLevelRelativeToRoot(Path filePath, int depth) {
-    if(filePath == null) {
+    if (filePath == null) {
       return false;
     }
     Path path = filePath;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4f2f2b3a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index 936460d..123f9b0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -52,7 +52,6 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.PullFileLoader;
 
@@ -140,14 +139,24 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
   }
 
   /**
-   * Fetch the list of changes since the last refresh of the repository and 
apply the changes to the {@link FlowCatalog}
+   * Fetch the list of changes since the last refresh of the repository
    * @throws GitAPIException
    * @throws IOException
    */
   @VisibleForTesting
-  public void processGitConfigChanges() throws GitAPIException, IOException {
+  void processGitConfigChanges() throws GitAPIException, IOException {
     List<DiffEntry> changes = this.gitRepo.getChanges();
+    if (!changes.isEmpty()) {
+      processGitConfigChangesHelper(changes);
+    }
+  }
 
+  /**
+   * A helper method where actual processing of the list of changes since the 
last refresh of the repository takes place
+   * and the changes applied.
+   * @throws IOException
+   */
+  void processGitConfigChangesHelper(List<DiffEntry> changes) throws 
IOException {
     for (DiffEntry change : changes) {
       switch (change.getChangeType()) {
         case ADD:

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4f2f2b3a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
index b5451bc..bd7d3cf 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -43,6 +43,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -119,94 +120,48 @@ public class GitFlowGraphMonitorTest {
     this.gitFlowGraphMonitor.setActive(true);
   }
 
-  private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, 
String paramValue)
-      throws IOException, GitAPIException {
-    // push a new node file
-    nodeDir.mkdirs();
-    nodeFile.createNewFile();
-    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + 
"=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
+  @Test
+  public void testAddNode() throws IOException, GitAPIException {
+    String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY 
+ "=true\nparam1=value1\n";
+    String file2Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY 
+ "=true\nparam2=value2\n";
 
-    // add, commit, push node
-    this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), 
nodeFile.getName())).call();
-    this.gitForPush.commit().setMessage("Node commit").call();
-    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+    addNode(this.node1Dir, this.node1File, file1Contents);
+    addNode(this.node2Dir, this.node2File, file2Contents);
 
     this.gitFlowGraphMonitor.processGitConfigChanges();
 
-    //Check if node1 has been added to the FlowGraph
-    DataNode dataNode = this.flowGraph.getNode(nodeId);
-    Assert.assertEquals(dataNode.getId(), nodeId);
-    Assert.assertTrue(dataNode.isActive());
-    Assert.assertEquals(dataNode.getRawConfig().getString("param1"), 
paramValue);
-  }
-
-  @Test
-  public void testAddNode()
-      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
-    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
-    testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
+    for (int i = 0; i < 1; i++) {
+      String nodeId = "node" + (i + 1);
+      String paramKey = "param" + (i + 1);
+      String paramValue = "value" + (i + 1);
+      //Check if nodes have been added to the FlowGraph
+      DataNode dataNode = this.flowGraph.getNode(nodeId);
+      Assert.assertEquals(dataNode.getId(), nodeId);
+      Assert.assertTrue(dataNode.isActive());
+      Assert.assertEquals(dataNode.getRawConfig().getString(paramKey), 
paramValue);
+    }
   }
 
   @Test (dependsOnMethods = "testAddNode")
   public void testAddEdge()
-      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
-    // push a new node file
-    this.edge1Dir.mkdirs();
-    this.edge1File.createNewFile();
-
-    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + 
"=FS:///flowEdgeTemplate\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specStore.fs.dir=/tmp1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specExecInstance.capabilities=s1:d1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specStore.fs.dir=/tmp2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
-
-    // add, commit, push
-    
this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
-    this.gitForPush.commit().setMessage("Edge commit").call();
-    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+      throws IOException, GitAPIException, ExecutionException, 
InterruptedException {
+    //Build contents of edge file
+    String fileContents = buildEdgeFileContents("node1", "node2", "edge1", 
"value1");
+    addEdge(this.edge1Dir, this.edge1File, fileContents);
 
     this.gitFlowGraphMonitor.processGitConfigChanges();
 
     //Check if edge1 has been added to the FlowGraph
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-    FlowEdge flowEdge = edgeSet.iterator().next();
-    Assert.assertEquals(flowEdge.getSrc(), "node1");
-    Assert.assertEquals(flowEdge.getDest(), "node2");
-    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
-    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),
 "s1:d1");
-    
Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), 
"InMemorySpecExecutor");
-    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),
 "/tmp2");
-    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),
 "s2:d2");
-    
Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1");
   }
 
   @Test (dependsOnMethods = "testAddNode")
   public void testUpdateEdge()
       throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
     //Update edge1 file
-    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + 
"=FS:///flowEdgeTemplate\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specStore.fs.dir=/tmp1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specExecInstance.capabilities=s1:d1\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specStore.fs.dir=/tmp2\n"
-        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specExecInstance.capabilities=s2:d2\n"
-        + "key1=value1\n", edge1File, Charsets.UTF_8);
+    String fileContents = buildEdgeFileContents("node1", "node2", "edge1", 
"value2");
+
+    addEdge(this.edge1Dir, this.edge1File, fileContents);
 
     // add, commit, push
     
this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
@@ -216,25 +171,22 @@ public class GitFlowGraphMonitorTest {
     this.gitFlowGraphMonitor.processGitConfigChanges();
 
     //Check if new edge1 has been added to the FlowGraph
-    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
-    Assert.assertEquals(edgeSet.size(), 1);
-    FlowEdge flowEdge = edgeSet.iterator().next();
-    Assert.assertEquals(flowEdge.getSrc(), "node1");
-    Assert.assertEquals(flowEdge.getDest(), "node2");
-    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
-    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),
 "s1:d1");
-    
Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), 
"InMemorySpecExecutor");
-    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),
 "/tmp2");
-    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),
 "s2:d2");
-    
Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), 
"InMemorySpecExecutor");
-    Assert.assertEquals(flowEdge.getConfig().getString("key1"), "value1");
+    testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value2");
   }
 
   @Test (dependsOnMethods = "testUpdateEdge")
   public void testUpdateNode()
       throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
     //Update param1 value in node1 and check if updated node is added to the 
graph
-    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
+    String fileContents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + 
"=true\nparam1=value3\n";
+    addNode(this.node1Dir, this.node1File, fileContents);
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+    //Check if node has been updated in the FlowGraph
+    DataNode dataNode = this.flowGraph.getNode("node1");
+    Assert.assertEquals(dataNode.getId(), "node1");
+    Assert.assertTrue(dataNode.isActive());
+    Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "value3");
   }
 
 
@@ -262,25 +214,152 @@ public class GitFlowGraphMonitorTest {
 
   @Test (dependsOnMethods = "testRemoveEdge")
   public void testRemoveNode() throws GitAPIException, IOException {
-    //delete node file
+    //delete node files
     node1File.delete();
+    node2File.delete();
 
-    //node1 is present in the graph before delete
+    //Ensure node1 and node2 are present in the graph before delete
     DataNode node1 = this.flowGraph.getNode("node1");
     Assert.assertNotNull(node1);
+    DataNode node2 = this.flowGraph.getNode("node2");
+    Assert.assertNotNull(node2);
 
     // delete, commit, push
-    DirCache ac = 
this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), 
this.node1File.getName())).call();
-    RevCommit cc = this.gitForPush.commit().setMessage("Node remove 
commit").call();
+    
this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), 
this.node1File.getName())).call();
+    
this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node2Dir.getName(), 
this.node2File.getName())).call();
+    this.gitForPush.commit().setMessage("Node remove commit").call();
     
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
 
     this.gitFlowGraphMonitor.processGitConfigChanges();
 
-    //Check if node1 has been deleted from the graph
+    //Check if node1 and node 2 have been deleted from the graph
     node1 = this.flowGraph.getNode("node1");
     Assert.assertNull(node1);
+    node2 = this.flowGraph.getNode("node2");
+    Assert.assertNull(node2);
   }
 
+  @Test (dependsOnMethods = "testRemoveNode")
+  public void testChangesReorder() throws GitAPIException, IOException, 
ExecutionException, InterruptedException {
+    String node1FileContents = 
FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value1\n";
+    String node2FileContents = 
FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam2=value2\n";
+
+    String edgeFileContents = buildEdgeFileContents("node1", "node2", "edge1", 
"value1");
+
+    createNewFile(this.node1Dir, this.node1File, node1FileContents);
+    createNewFile(this.node2Dir, this.node2File, node2FileContents);
+    createNewFile(this.edge1Dir, this.edge1File, edgeFileContents);
+
+    // add, commit, push
+    
this.gitForPush.add().addFilepattern(formNodeFilePath(this.node1Dir.getName(), 
this.node1File.getName())).call();
+    
this.gitForPush.add().addFilepattern(formNodeFilePath(this.node2Dir.getName(), 
this.node2File.getName())).call();
+    this.gitForPush.commit().setMessage("Add nodes commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    
this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Add nodes and edges commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+    //Ensure node1 and node2 are present in the graph
+    DataNode node1 = this.flowGraph.getNode("node1");
+    Assert.assertNotNull(node1);
+    DataNode node2 = this.flowGraph.getNode("node2");
+    Assert.assertNotNull(node2);
+    testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1");
+
+    //Delete node1, edge node1->node2 files
+    node1File.delete();
+    edge1File.delete();
+
+    //Commit1: delete node1 and edge node1->node2
+    
this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), 
this.node1File.getName())).call();
+    
this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Delete node1 and edge1 
commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    //Commit2: add node1 back
+    createNewFile(this.node1Dir, this.node1File, node1FileContents);
+    
this.gitForPush.add().addFilepattern(formNodeFilePath(this.node1Dir.getName(), 
this.node1File.getName())).call();
+    this.gitForPush.commit().setMessage("Add node1 commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+    node1 = this.flowGraph.getNode("node1");
+    Assert.assertNotNull(node1);
+    Assert.assertEquals(this.flowGraph.getEdges(node1).size(), 0);
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    cleanUpDir(TEST_DIR);
+  }
+
+  private void createNewFile(File dir, File file, String fileContents) throws 
IOException {
+    dir.mkdirs();
+    file.createNewFile();
+    Files.write(fileContents, file, Charsets.UTF_8);
+  }
+
+  private void addNode(File nodeDir, File nodeFile, String fileContents) 
throws IOException, GitAPIException {
+    createNewFile(nodeDir, nodeFile, fileContents);
+
+    // add, commit, push node
+    this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), 
nodeFile.getName())).call();
+    this.gitForPush.commit().setMessage("Node commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+  }
+
+  private void addEdge(File edgeDir, File edgeFile, String fileContents) 
throws IOException, GitAPIException {
+    createNewFile(edgeDir, edgeFile, fileContents);
+
+    // add, commit, push edge
+    
this.gitForPush.add().addFilepattern(formEdgeFilePath(edgeDir.getParentFile().getName(),
 edgeDir.getName(), edgeFile.getName())).call();
+    this.gitForPush.commit().setMessage("Edge commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+  }
+
+  private String buildEdgeFileContents(String node1, String node2, String 
edgeName, String value) {
+    String fileContents = FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + 
"=" + node1 + "\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=" + node2 + 
"\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=" + edgeName + "\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + 
"=FS:///flowEdgeTemplate\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specStore.fs.dir=/tmp1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specExecInstance.capabilities=s1:d1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specStore.fs.dir=/tmp2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specExecInstance.capabilities=s2:d2\n"
+        + "key1=" + value + "\n";
+    return fileContents;
+  }
+
+  private void testIfEdgeSuccessfullyAdded(String node1, String node2, String 
edgeName, String value) throws ExecutionException, InterruptedException {
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges(node1);
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getId(), Joiner.on(":").join(node1, node2, 
edgeName));
+    Assert.assertEquals(flowEdge.getSrc(), node1);
+    Assert.assertEquals(flowEdge.getDest(), node2);
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),
 "s1:d1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),
 "/tmp2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),
 "s2:d2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getConfig().getString("key1"), value);
+  }
+
+  private String formNodeFilePath(String groupDir, String fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir 
+ SystemUtils.FILE_SEPARATOR + fileName;
+  }
+
+  private String formEdgeFilePath(String parentDir, String groupDir, String 
fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + 
parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR 
+ fileName;
+  }
 
   private void cleanUpDir(String dir) {
     File specStoreDir = new File(dir);
@@ -298,17 +377,4 @@ public class GitFlowGraphMonitorTest {
       }
     }
   }
-
-  private String formNodeFilePath(String groupDir, String fileName) {
-    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir 
+ SystemUtils.FILE_SEPARATOR + fileName;
-  }
-
-  private String formEdgeFilePath(String parentDir, String groupDir, String 
fileName) {
-    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + 
parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR 
+ fileName;
-  }
-
-  @AfterClass
-  public void tearDown() throws Exception {
-    cleanUpDir(TEST_DIR);
-  }
 }
\ No newline at end of file

Reply via email to