Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2c5e25d98 -> 6b1201852


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
 
b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
new file mode 100644
index 0000000..43fa9a3
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.dircache.DirCache;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class GitFlowGraphMonitorTest {
+  private static final Logger logger = 
LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+  private Repository remoteRepo;
+  private Git gitForPush;
+  private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
+  private final File remoteDir = new File(TEST_DIR + "/remote");
+  private final File cloneDir = new File(TEST_DIR + "/clone");
+  private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
+  private static final String NODE_1_FILE = "node1.properties";
+  private final File node1Dir = new File(flowGraphDir, "node1");
+  private final File node1File = new File(node1Dir, NODE_1_FILE);
+  private static final String NODE_2_FILE = "node2.properties";
+  private final File node2Dir = new File(flowGraphDir, "node2");
+  private final File node2File = new File(node2Dir, NODE_2_FILE);
+  private final File edge1Dir = new File(node1Dir, "node2");
+  private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+  private RefSpec masterRefSpec = new RefSpec("master");
+  private FSFlowCatalog flowCatalog;
+  private Config config;
+  private BaseFlowGraph flowGraph;
+  private GitFlowGraphMonitor gitFlowGraphMonitor;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(TEST_DIR);
+
+    // Create a bare repository
+    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, 
FS.DETECTED);
+    this.remoteRepo = fileKey.open(false);
+    this.remoteRepo.create(true);
+
+    this.gitForPush = 
Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+
+    // push an empty commit as a base for detecting changes
+    this.gitForPush.commit().setMessage("First commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.config = ConfigBuilder.create()
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
+            + ConfigurationKeys.GIT_MONITOR_REPO_URI, 
this.remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
+        .build();
+
+    // Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+    //Create a FlowGraph instance with defaults
+    this.flowGraph = new BaseFlowGraph();
+
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, 
this.flowCatalog, this.flowGraph);
+    this.gitFlowGraphMonitor.setActive(true);
+  }
+
+  private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, 
String paramValue)
+      throws IOException, GitAPIException {
+    // push a new node file
+    nodeDir.mkdirs();
+    nodeFile.createNewFile();
+    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + 
"=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
+
+    // add, commit, push node
+    this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), 
nodeFile.getName())).call();
+    this.gitForPush.commit().setMessage("Node commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if node1 has been added to the FlowGraph
+    DataNode dataNode = this.flowGraph.getNode(nodeId);
+    Assert.assertEquals(dataNode.getId(), nodeId);
+    Assert.assertTrue(dataNode.isActive());
+    Assert.assertEquals(dataNode.getProps().getString("param1"), paramValue);
+  }
+
+  @Test
+  public void testAddNode()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
+    testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
+  }
+
+  @Test (dependsOnMethods = "testAddNode")
+  public void testAddEdge()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    // push a new node file
+    this.edge1Dir.mkdirs();
+    this.edge1File.createNewFile();
+
+    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + 
"=FS:///test-template/flow.conf\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specStore.fs.dir=/tmp1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specExecInstance.capabilities=s1:d1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specStore.fs.dir=/tmp2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
+
+    // add, commit, push
+    
this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Edge commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if edge1 has been added to the FlowGraph
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
+    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),
 "s1:d1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),
 "/tmp2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),
 "s2:d2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+  }
+
+  @Test (dependsOnMethods = "testAddNode")
+  public void testUpdateEdge()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    //Update edge1 file
+    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_URI_KEY + 
"=FS:///test-template/flow.conf\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specStore.fs.dir=/tmp1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specExecInstance.capabilities=s1:d1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specStore.fs.dir=/tmp2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specExecInstance.capabilities=s2:d2\n"
+        + "key1=value1\n", edge1File, Charsets.UTF_8);
+
+    // add, commit, push
+    
this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Edge commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if new edge1 has been added to the FlowGraph
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getEndPoints().get(0), "node1");
+    Assert.assertEquals(flowEdge.getEndPoints().get(1), "node2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),
 "s1:d1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),
 "/tmp2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),
 "s2:d2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getProps().getString("key1"), "value1");
+  }
+
+  @Test (dependsOnMethods = "testUpdateEdge")
+  public void testUpdateNode()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    //Update param1 value in node1 and check if updated node is added to the 
graph
+    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
+  }
+
+
+  @Test (dependsOnMethods = "testUpdateNode")
+  public void testRemoveEdge() throws GitAPIException, IOException {
+    // delete a config file
+    edge1File.delete();
+
+    //Node1 has 1 edge before delete
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+
+    // delete, commit, push
+    DirCache ac = 
this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
+        this.edge1Dir.getName(), this.edge1File.getName())).call();
+    RevCommit cc = this.gitForPush.commit().setMessage("Edge remove 
commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if edge1 has been deleted from the graph
+    edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertTrue(edgeSet.size() == 0);
+  }
+
+  @Test (dependsOnMethods = "testRemoveEdge")
+  public void testRemoveNode() throws GitAPIException, IOException {
+    //delete node file
+    node1File.delete();
+
+    //node1 is present in the graph before delete
+    DataNode node1 = this.flowGraph.getNode("node1");
+    Assert.assertNotNull(node1);
+
+    // delete, commit, push
+    DirCache ac = 
this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), 
this.node1File.getName())).call();
+    RevCommit cc = this.gitForPush.commit().setMessage("Node remove 
commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if node1 has been deleted from the graph
+    node1 = this.flowGraph.getNode("node1");
+    Assert.assertNull(node1);
+  }
+
+
+  private void cleanUpDir(String dir) {
+    File specStoreDir = new File(dir);
+
+    // cleanup is flaky on Travis, so retry a few times and then suppress the 
error if unsuccessful
+    for (int i = 0; i < 5; i++) {
+      try {
+        if (specStoreDir.exists()) {
+          FileUtils.deleteDirectory(specStoreDir);
+        }
+        // if delete succeeded then break out of loop
+        break;
+      } catch (IOException e) {
+        logger.warn("Cleanup delete directory failed for directory: " + dir, 
e);
+      }
+    }
+  }
+
+  private String formNodeFilePath(String groupDir, String fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir 
+ SystemUtils.FILE_SEPARATOR + fileName;
+  }
+
+  private String formEdgeFilePath(String parentDir, String groupDir, String 
fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + 
parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR 
+ fileName;
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    cleanUpDir(TEST_DIR);
+  }
+}
\ No newline at end of file

Reply via email to