Repository: incubator-gobblin Updated Branches: refs/heads/master 2509f3a3d -> 4f2f2b3a5
[GOBBLIN-611] Ensure node events are processed before edge events in GitFlowGraphMonitor.[] Closes #2477 from sv2000/flowGraphMonitorChangeDepth Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4f2f2b3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4f2f2b3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4f2f2b3a Branch: refs/heads/master Commit: 4f2f2b3a5c3d354d6e13d91e93bda7a3ade096a9 Parents: 2509f3a Author: sv2000 <[email protected]> Authored: Fri Oct 12 10:19:40 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Oct 12 10:19:40 2018 -0700 ---------------------------------------------------------------------- .../modules/core/GitFlowGraphMonitor.java | 38 ++- .../modules/core/GitMonitoringService.java | 15 +- .../modules/core/GitFlowGraphMonitorTest.java | 268 ++++++++++++------- 3 files changed, 215 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4f2f2b3a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java index 4a60d35..e12e839 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -18,9 +18,12 @@ package org.apache.gobblin.service.modules.core; import java.io.IOException; +import java.util.Collections; +import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; +import org.eclipse.jgit.api.errors.GitAPIException; import org.eclipse.jgit.diff.DiffEntry; import com.google.common.base.Joiner; @@ -95,6 +98,29 @@ public class GitFlowGraphMonitor extends GitMonitoringService { } /** + * Sort the changes in a commit so that changes to node files appear before changes to edge files. This is done so that + * node related changes are applied to the FlowGraph before edge related changes. An example where the order matters + * is the case when a commit adds a new node n2 as well as adds an edge from an existing node n1 to n2. To ensure that the + * addition of edge n1->n2 is successful, node n2 must exist in the graph and so needs to be added first. For deletions, + * the order does not matter and ordering the changes in the commit will result in the same FlowGraph state as if the changes + * were unordered. In other words, deletion of a node deletes all its incident edges from the FlowGraph. So processing an + * edge deletion later results in a no-op. Note that node and edge files do not change depth in case of modifications. + * + * If there are multiple commits between successive polls to Git, the re-ordering of changes across commits should not + * affect the final state of the FlowGraph. This is because, the order of changes for a given file type (i.e. node or edge) + * is preserved. + */ + @Override + void processGitConfigChanges() throws GitAPIException, IOException { + List<DiffEntry> changes = this.gitRepo.getChanges(); + Collections.sort(changes, (o1, o2) -> { + Integer o1Depth = (o1.getNewPath() != null) ? (new Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth(); + Integer o2Depth = (o2.getNewPath() != null) ? (new Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth(); + return o1Depth.compareTo(o2Depth); + }); + processGitConfigChangesHelper(changes); + } + /** * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to * the {@link FlowGraph} for an added, updated or modified node or edge file. * @param change @@ -139,6 +165,8 @@ public class GitFlowGraphMonitor extends GitMonitoringService { DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config); if (!this.flowGraph.addDataNode(dataNode)) { log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId()); + } else { + log.info("Added Datanode {} to FlowGraph", dataNode.getId()); } } catch (Exception e) { log.warn("Could not add DataNode defined in {} due to exception {}", change.getNewPath(), e.getMessage()); @@ -158,6 +186,8 @@ public class GitFlowGraphMonitor extends GitMonitoringService { String nodeId = config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY); if (!this.flowGraph.deleteDataNode(nodeId)) { log.warn("Could not remove DataNode {} from FlowGraph; skipping", nodeId); + } else { + log.info("Removed DataNode {} from FlowGraph", nodeId); } } } @@ -178,6 +208,8 @@ public class GitFlowGraphMonitor extends GitMonitoringService { FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog); if (!this.flowGraph.addFlowEdge(edge)) { log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId()); + } else { + log.info("Added edge {} to FlowGraph", edge.getId()); } } catch (Exception e) { log.warn("Could not add edge defined in {} due to exception {}", change.getNewPath(), e.getMessage()); @@ -198,7 +230,9 @@ public class GitFlowGraphMonitor extends GitMonitoringService { Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), edgeFilePath); String edgeId = config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY); if (!this.flowGraph.deleteFlowEdge(edgeId)) { - log.warn("Could not remove FlowEdge {} from FlowGraph; skipping", edgeId); + log.warn("Could not remove edge {} from FlowGraph; skipping", edgeId); + } else { + log.info("Removed edge {} from FlowGraph", edgeId); } } catch (Exception e) { log.warn("Could not remove edge defined in {} due to exception {}", edgeFilePath, e.getMessage()); @@ -234,7 +268,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService { * @return true if the file conforms to the expected hierarchy */ private boolean checkFileLevelRelativeToRoot(Path filePath, int depth) { - if(filePath == null) { + if (filePath == null) { return false; } Path path = filePath; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4f2f2b3a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java index 936460d..123f9b0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java @@ -52,7 +52,6 @@ import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PullFileLoader; @@ -140,14 +139,24 @@ public abstract class GitMonitoringService extends AbstractIdleService { } /** - * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog} + * Fetch the list of changes since the last refresh of the repository * @throws GitAPIException * @throws IOException */ @VisibleForTesting - public void processGitConfigChanges() throws GitAPIException, IOException { + void processGitConfigChanges() throws GitAPIException, IOException { List<DiffEntry> changes = this.gitRepo.getChanges(); + if (!changes.isEmpty()) { + processGitConfigChangesHelper(changes); + } + } + /** + * A helper method where actual processing of the list of changes since the last refresh of the repository takes place + * and the changes applied. + * @throws IOException + */ + void processGitConfigChangesHelper(List<DiffEntry> changes) throws IOException { for (DiffEntry change : changes) { switch (change.getChangeType()) { case ADD: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4f2f2b3a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java index b5451bc..bd7d3cf 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java @@ -43,6 +43,7 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.base.Charsets; +import com.google.common.base.Joiner; import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -119,94 +120,48 @@ public class GitFlowGraphMonitorTest { this.gitFlowGraphMonitor.setActive(true); } - private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, String paramValue) - throws IOException, GitAPIException { - // push a new node file - nodeDir.mkdirs(); - nodeFile.createNewFile(); - Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8); + @Test + public void testAddNode() throws IOException, GitAPIException { + String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value1\n"; + String file2Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam2=value2\n"; - // add, commit, push node - this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call(); - this.gitForPush.commit().setMessage("Node commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + addNode(this.node1Dir, this.node1File, file1Contents); + addNode(this.node2Dir, this.node2File, file2Contents); this.gitFlowGraphMonitor.processGitConfigChanges(); - //Check if node1 has been added to the FlowGraph - DataNode dataNode = this.flowGraph.getNode(nodeId); - Assert.assertEquals(dataNode.getId(), nodeId); - Assert.assertTrue(dataNode.isActive()); - Assert.assertEquals(dataNode.getRawConfig().getString("param1"), paramValue); - } - - @Test - public void testAddNode() - throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { - testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1"); - testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2"); + for (int i = 0; i < 1; i++) { + String nodeId = "node" + (i + 1); + String paramKey = "param" + (i + 1); + String paramValue = "value" + (i + 1); + //Check if nodes have been added to the FlowGraph + DataNode dataNode = this.flowGraph.getNode(nodeId); + Assert.assertEquals(dataNode.getId(), nodeId); + Assert.assertTrue(dataNode.isActive()); + Assert.assertEquals(dataNode.getRawConfig().getString(paramKey), paramValue); + } } @Test (dependsOnMethods = "testAddNode") public void testAddEdge() - throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { - // push a new node file - this.edge1Dir.mkdirs(); - this.edge1File.createNewFile(); - - Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8); - - // add, commit, push - this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); - this.gitForPush.commit().setMessage("Edge commit").call(); - this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + throws IOException, GitAPIException, ExecutionException, InterruptedException { + //Build contents of edge file + String fileContents = buildEdgeFileContents("node1", "node2", "edge1", "value1"); + addEdge(this.edge1Dir, this.edge1File, fileContents); this.gitFlowGraphMonitor.processGitConfigChanges(); //Check if edge1 has been added to the FlowGraph - Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); - Assert.assertEquals(edgeSet.size(), 1); - FlowEdge flowEdge = edgeSet.iterator().next(); - Assert.assertEquals(flowEdge.getSrc(), "node1"); - Assert.assertEquals(flowEdge.getDest(), "node2"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor"); + testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1"); } @Test (dependsOnMethods = "testAddNode") public void testUpdateEdge() throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { //Update edge1 file - Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" - + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n" - + "key1=value1\n", edge1File, Charsets.UTF_8); + String fileContents = buildEdgeFileContents("node1", "node2", "edge1", "value2"); + + addEdge(this.edge1Dir, this.edge1File, fileContents); // add, commit, push this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); @@ -216,25 +171,22 @@ public class GitFlowGraphMonitorTest { this.gitFlowGraphMonitor.processGitConfigChanges(); //Check if new edge1 has been added to the FlowGraph - Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1"); - Assert.assertEquals(edgeSet.size(), 1); - FlowEdge flowEdge = edgeSet.iterator().next(); - Assert.assertEquals(flowEdge.getSrc(), "node1"); - Assert.assertEquals(flowEdge.getDest(), "node2"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1"); - Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2"); - Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor"); - Assert.assertEquals(flowEdge.getConfig().getString("key1"), "value1"); + testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value2"); } @Test (dependsOnMethods = "testUpdateEdge") public void testUpdateNode() throws IOException, GitAPIException, URISyntaxException, ExecutionException, InterruptedException { //Update param1 value in node1 and check if updated node is added to the graph - testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3"); + String fileContents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value3\n"; + addNode(this.node1Dir, this.node1File, fileContents); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + //Check if node has been updated in the FlowGraph + DataNode dataNode = this.flowGraph.getNode("node1"); + Assert.assertEquals(dataNode.getId(), "node1"); + Assert.assertTrue(dataNode.isActive()); + Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "value3"); } @@ -262,25 +214,152 @@ public class GitFlowGraphMonitorTest { @Test (dependsOnMethods = "testRemoveEdge") public void testRemoveNode() throws GitAPIException, IOException { - //delete node file + //delete node files node1File.delete(); + node2File.delete(); - //node1 is present in the graph before delete + //Ensure node1 and node2 are present in the graph before delete DataNode node1 = this.flowGraph.getNode("node1"); Assert.assertNotNull(node1); + DataNode node2 = this.flowGraph.getNode("node2"); + Assert.assertNotNull(node2); // delete, commit, push - DirCache ac = this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call(); - RevCommit cc = this.gitForPush.commit().setMessage("Node remove commit").call(); + this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call(); + this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node2Dir.getName(), this.node2File.getName())).call(); + this.gitForPush.commit().setMessage("Node remove commit").call(); this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); this.gitFlowGraphMonitor.processGitConfigChanges(); - //Check if node1 has been deleted from the graph + //Check if node1 and node 2 have been deleted from the graph node1 = this.flowGraph.getNode("node1"); Assert.assertNull(node1); + node2 = this.flowGraph.getNode("node2"); + Assert.assertNull(node2); } + @Test (dependsOnMethods = "testRemoveNode") + public void testChangesReorder() throws GitAPIException, IOException, ExecutionException, InterruptedException { + String node1FileContents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=value1\n"; + String node2FileContents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam2=value2\n"; + + String edgeFileContents = buildEdgeFileContents("node1", "node2", "edge1", "value1"); + + createNewFile(this.node1Dir, this.node1File, node1FileContents); + createNewFile(this.node2Dir, this.node2File, node2FileContents); + createNewFile(this.edge1Dir, this.edge1File, edgeFileContents); + + // add, commit, push + this.gitForPush.add().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call(); + this.gitForPush.add().addFilepattern(formNodeFilePath(this.node2Dir.getName(), this.node2File.getName())).call(); + this.gitForPush.commit().setMessage("Add nodes commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); + this.gitForPush.commit().setMessage("Add nodes and edges commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + //Ensure node1 and node2 are present in the graph + DataNode node1 = this.flowGraph.getNode("node1"); + Assert.assertNotNull(node1); + DataNode node2 = this.flowGraph.getNode("node2"); + Assert.assertNotNull(node2); + testIfEdgeSuccessfullyAdded("node1", "node2", "edge1", "value1"); + + //Delete node1, edge node1->node2 files + node1File.delete(); + edge1File.delete(); + + //Commit1: delete node1 and edge node1->node2 + this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call(); + this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(), this.edge1Dir.getName(), this.edge1File.getName())).call(); + this.gitForPush.commit().setMessage("Delete node1 and edge1 commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + //Commit2: add node1 back + createNewFile(this.node1Dir, this.node1File, node1FileContents); + this.gitForPush.add().addFilepattern(formNodeFilePath(this.node1Dir.getName(), this.node1File.getName())).call(); + this.gitForPush.commit().setMessage("Add node1 commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + + this.gitFlowGraphMonitor.processGitConfigChanges(); + node1 = this.flowGraph.getNode("node1"); + Assert.assertNotNull(node1); + Assert.assertEquals(this.flowGraph.getEdges(node1).size(), 0); + } + + @AfterClass + public void tearDown() throws Exception { + cleanUpDir(TEST_DIR); + } + + private void createNewFile(File dir, File file, String fileContents) throws IOException { + dir.mkdirs(); + file.createNewFile(); + Files.write(fileContents, file, Charsets.UTF_8); + } + + private void addNode(File nodeDir, File nodeFile, String fileContents) throws IOException, GitAPIException { + createNewFile(nodeDir, nodeFile, fileContents); + + // add, commit, push node + this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), nodeFile.getName())).call(); + this.gitForPush.commit().setMessage("Node commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + } + + private void addEdge(File edgeDir, File edgeFile, String fileContents) throws IOException, GitAPIException { + createNewFile(edgeDir, edgeFile, fileContents); + + // add, commit, push edge + this.gitForPush.add().addFilepattern(formEdgeFilePath(edgeDir.getParentFile().getName(), edgeDir.getName(), edgeFile.getName())).call(); + this.gitForPush.commit().setMessage("Edge commit").call(); + this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); + } + + private String buildEdgeFileContents(String node1, String node2, String edgeName, String value) { + String fileContents = FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=" + node1 + "\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=" + node2 + "\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=" + edgeName + "\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + "=FS:///flowEdgeTemplate\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0." + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specStore.fs.dir=/tmp1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0.specExecInstance.capabilities=s1:d1\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1." + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + "=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specStore.fs.dir=/tmp2\n" + + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1.specExecInstance.capabilities=s2:d2\n" + + "key1=" + value + "\n"; + return fileContents; + } + + private void testIfEdgeSuccessfullyAdded(String node1, String node2, String edgeName, String value) throws ExecutionException, InterruptedException { + Set<FlowEdge> edgeSet = this.flowGraph.getEdges(node1); + Assert.assertEquals(edgeSet.size(), 1); + FlowEdge flowEdge = edgeSet.iterator().next(); + Assert.assertEquals(flowEdge.getId(), Joiner.on(":").join(node1, node2, edgeName)); + Assert.assertEquals(flowEdge.getSrc(), node1); + Assert.assertEquals(flowEdge.getDest(), node2); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"), "/tmp1"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"), "s1:d1"); + Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), "InMemorySpecExecutor"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"), "/tmp2"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"), "s2:d2"); + Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), "InMemorySpecExecutor"); + Assert.assertEquals(flowEdge.getConfig().getString("key1"), value); + } + + private String formNodeFilePath(String groupDir, String fileName) { + return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; + } + + private String formEdgeFilePath(String parentDir, String groupDir, String fileName) { + return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; + } private void cleanUpDir(String dir) { File specStoreDir = new File(dir); @@ -298,17 +377,4 @@ public class GitFlowGraphMonitorTest { } } } - - private String formNodeFilePath(String groupDir, String fileName) { - return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; - } - - private String formEdgeFilePath(String parentDir, String groupDir, String fileName) { - return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; - } - - @AfterClass - public void tearDown() throws Exception { - cleanUpDir(TEST_DIR); - } } \ No newline at end of file
