Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 2665bdbce -> f121bb2c8


[GOBBLIN-603] Add ServiceManager to manage GitFlowGraphMonitor in multihop flow 
compiler.[]

Closes #2469 from sv2000/specCompilerRefactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f121bb2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f121bb2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f121bb2c

Branch: refs/heads/master
Commit: f121bb2c8ab4e5af37c2419e1417a1420eb2379e
Parents: 2665bdb
Author: sv2000 <[email protected]>
Authored: Mon Oct 8 09:06:14 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Oct 8 09:06:14 2018 -0700

----------------------------------------------------------------------
 .../modules/core/GitFlowGraphMonitor.java       |  2 +-
 .../modules/core/GitMonitoringService.java      |  4 +-
 .../modules/flow/MultiHopFlowCompiler.java      | 36 +++++++-
 .../modules/flow/MultiHopFlowCompilerTest.java  | 95 +++++++++++++++++++-
 4 files changed, 130 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index c7dd226..4a60d35 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -91,7 +91,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService 
{
    */
   @Override
   public boolean shouldPollGit() {
-    return this.isActive;
+    return true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
index c4d3656..936460d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
@@ -120,8 +120,8 @@ public abstract class GitMonitoringService extends 
AbstractIdleService {
   /** Start the service. */
   @Override
   protected void startUp() throws Exception {
-    log.info("Starting the " + GitConfigMonitor.class.getSimpleName());
-    log.info("Polling git with inteval {} ", this.pollingInterval);
+    log.info("Starting the " + getClass().getSimpleName());
+    log.info("Polling git with interval {} ", this.pollingInterval);
 
     // Schedule the job config fetch task
     this.scheduledExecutor.scheduleAtFixedRate(new Runnable() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 1ab8312..52116a2 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -21,12 +21,15 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ServiceManager;
 import com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -59,13 +62,15 @@ import org.apache.gobblin.util.ConfigUtils;
 @Alpha
 @Slf4j
 public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
-
   @Getter
   private final FlowGraph flowGraph;
-  private GitFlowGraphMonitor gitFlowGraphMonitor;
+  @Getter
+  private ServiceManager serviceManager;
   @Getter
   private boolean active;
 
+  private GitFlowGraphMonitor gitFlowGraphMonitor;
+
   public MultiHopFlowCompiler(Config config) {
     this(config, true);
   }
@@ -90,6 +95,15 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     }
     this.flowGraph = new BaseFlowGraph();
     this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, 
flowCatalog, this.flowGraph);
+    this.serviceManager = new 
ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor));
+    addShutdownHook();
+    //Start the git flow graph monitor
+    try {
+      this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
+    } catch (TimeoutException te) {
+      this.log.error("Timed out while waiting for the service manager to start 
up", te);
+      throw new RuntimeException(te);
+    }
   }
 
   @VisibleForTesting
@@ -151,4 +165,22 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     log.warn("No population of templates based on edge happen in this 
implementation");
     return;
   }
+
+  /**
+   * Register a shutdown hook for this thread.
+   */
+  private void addShutdownHook() {
+    ServiceManager manager = this.serviceManager;
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      public void run() {
+        // Give the services 5 seconds to stop to ensure that we are 
responsive to shutdown
+        // requests.
+        try {
+          manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
+        } catch (TimeoutException timeout) {
+          // stopping timed out
+        }
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index c214b35..955205b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.flow;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -26,17 +27,28 @@ import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
@@ -44,6 +56,7 @@ import com.typesafe.config.ConfigSyntax;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
@@ -52,6 +65,7 @@ import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
@@ -69,7 +83,8 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Slf4j
 public class MultiHopFlowCompilerTest {
   private FlowGraph flowGraph;
-  private SpecCompiler specCompiler;
+  private MultiHopFlowCompiler specCompiler;
+  private final String TESTDIR = "/tmp/mhCompiler/gitFlowGraphTestDir";
 
   @BeforeClass
   public void setUp()
@@ -400,8 +415,84 @@ public class MultiHopFlowCompilerTest {
     }
   }
 
+  @Test (dependsOnMethods = "testMulticastPath")
+  public void testGitFlowGraphMonitorService()
+      throws IOException, GitAPIException, URISyntaxException, 
InterruptedException {
+    File remoteDir = new File(TESTDIR + "/remote");
+    File cloneDir = new File(TESTDIR + "/clone");
+    File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
+
+    //Clean up
+    cleanUpDir(TESTDIR);
+
+    // Create a bare repository
+    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, 
FS.DETECTED);
+    Repository remoteRepo = fileKey.open(false);
+    remoteRepo.create(true);
+
+    Git gitForPush = 
Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+
+    // push an empty commit as a base for detecting changes
+    gitForPush.commit().setMessage("First commit").call();
+    RefSpec masterRefSpec = new RefSpec("master");
+    gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call();
+
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
+            + ConfigurationKeys.GIT_MONITOR_REPO_URI, 
remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_DIR, TESTDIR + "/git-flowgraph")
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
+        
.addPrimitive(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
flowTemplateCatalogUri.toString())
+        .build();
+
+    //Create a MultiHopFlowCompiler instance
+    specCompiler = new MultiHopFlowCompiler(config, Optional.absent(), false);
+
+    //Ensure node1 is not present in the graph
+    Assert.assertNull(specCompiler.getFlowGraph().getNode("node1"));
+
+    // push a new node file
+    File nodeDir = new File(flowGraphDir, "node1");
+    File nodeFile = new File(nodeDir, "node1.properties");
+    nodeDir.mkdirs();
+    nodeFile.createNewFile();
+    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + 
"=true\nparam1=val1" + "\n", nodeFile, Charsets.UTF_8);
+
+    // add, commit, push node
+    gitForPush.add().addFilepattern(formNodeFilePath(flowGraphDir, 
nodeDir.getName(), nodeFile.getName())).call();
+    gitForPush.commit().setMessage("Node commit").call();
+    gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call();
+
+    // polling is every 5 seconds, so wait twice as long and check
+    TimeUnit.SECONDS.sleep(10);
+
+    //Test that a DataNode is added to FlowGraph
+    DataNode dataNode = specCompiler.getFlowGraph().getNode("node1");
+    Assert.assertEquals(dataNode.getId(), "node1");
+    Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "val1");
+  }
+
+  private String formNodeFilePath(File flowGraphDir, String groupDir, String 
fileName) {
+    return flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + 
SystemUtils.FILE_SEPARATOR + fileName;
+  }
+
+  private void cleanUpDir(String dir) throws IOException {
+    File dirToDelete = new File(dir);
+    if (dirToDelete.exists()) {
+      FileUtils.deleteDirectory(new File(dir));
+    }
+  }
+
   @AfterClass
-  public void tearDown() {
+  public void tearDown() throws IOException {
+    cleanUpDir(TESTDIR);
+    try {
+      this.specCompiler.getServiceManager().stopAsync().awaitStopped(5, 
TimeUnit.SECONDS);
+    } catch (Exception e) {
+      log.warn("Could not stop Service Manager");
+    }
   }
 
   public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {

Reply via email to