Repository: incubator-gobblin Updated Branches: refs/heads/master 2665bdbce -> f121bb2c8
[GOBBLIN-603] Add ServiceManager to manage GitFlowGraphMonitor in multihop flow compiler.[] Closes #2469 from sv2000/specCompilerRefactor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f121bb2c Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f121bb2c Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f121bb2c Branch: refs/heads/master Commit: f121bb2c8ab4e5af37c2419e1417a1420eb2379e Parents: 2665bdb Author: sv2000 <[email protected]> Authored: Mon Oct 8 09:06:14 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Oct 8 09:06:14 2018 -0700 ---------------------------------------------------------------------- .../modules/core/GitFlowGraphMonitor.java | 2 +- .../modules/core/GitMonitoringService.java | 4 +- .../modules/flow/MultiHopFlowCompiler.java | 36 +++++++- .../modules/flow/MultiHopFlowCompilerTest.java | 95 +++++++++++++++++++- 4 files changed, 130 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java index c7dd226..4a60d35 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -91,7 +91,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService { */ @Override public boolean shouldPollGit() { - return this.isActive; + return true; } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java index c4d3656..936460d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java @@ -120,8 +120,8 @@ public abstract class GitMonitoringService extends AbstractIdleService { /** Start the service. */ @Override protected void startUp() throws Exception { - log.info("Starting the " + GitConfigMonitor.class.getSimpleName()); - log.info("Polling git with inteval {} ", this.pollingInterval); + log.info("Starting the " + getClass().getSimpleName()); + log.info("Polling git with interval {} ", this.pollingInterval); // Schedule the job config fetch task this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 1ab8312..52116a2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -21,12 +21,15 @@ import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ServiceManager; import com.typesafe.config.Config; import lombok.Getter; @@ -59,13 +62,15 @@ import org.apache.gobblin.util.ConfigUtils; @Alpha @Slf4j public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { - @Getter private final FlowGraph flowGraph; - private GitFlowGraphMonitor gitFlowGraphMonitor; + @Getter + private ServiceManager serviceManager; @Getter private boolean active; + private GitFlowGraphMonitor gitFlowGraphMonitor; + public MultiHopFlowCompiler(Config config) { this(config, true); } @@ -90,6 +95,15 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { } this.flowGraph = new BaseFlowGraph(); this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph); + this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor)); + addShutdownHook(); + //Start the git flow graph monitor + try { + this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS); + } catch (TimeoutException te) { + this.log.error("Timed out while waiting for the service manager to start up", te); + throw new RuntimeException(te); + } } @VisibleForTesting @@ -151,4 +165,22 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { log.warn("No population of templates based on edge happen in this implementation"); return; } + + /** + * Register a shutdown hook for this thread. + */ + private void addShutdownHook() { + ServiceManager manager = this.serviceManager; + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + // Give the services 5 seconds to stop to ensure that we are responsive to shutdown + // requests. + try { + manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS); + } catch (TimeoutException timeout) { + // stopping timed out + } + } + }); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f121bb2c/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index c214b35..955205b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.flow; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -26,17 +27,28 @@ import java.nio.charset.Charset; import java.util.List; import java.util.Properties; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.SystemUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.RepositoryCache; +import org.eclipse.jgit.transport.RefSpec; +import org.eclipse.jgit.util.FS; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.base.Charsets; +import com.google.common.base.Optional; +import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigParseOptions; @@ -44,6 +56,7 @@ import com.typesafe.config.ConfigSyntax; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; @@ -52,6 +65,7 @@ import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor; import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.DataNode; @@ -69,7 +83,8 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @Slf4j public class MultiHopFlowCompilerTest { private FlowGraph flowGraph; - private SpecCompiler specCompiler; + private MultiHopFlowCompiler specCompiler; + private final String TESTDIR = "/tmp/mhCompiler/gitFlowGraphTestDir"; @BeforeClass public void setUp() @@ -400,8 +415,84 @@ public class MultiHopFlowCompilerTest { } } + @Test (dependsOnMethods = "testMulticastPath") + public void testGitFlowGraphMonitorService() + throws IOException, GitAPIException, URISyntaxException, InterruptedException { + File remoteDir = new File(TESTDIR + "/remote"); + File cloneDir = new File(TESTDIR + "/clone"); + File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph"); + + //Clean up + cleanUpDir(TESTDIR); + + // Create a bare repository + RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, FS.DETECTED); + Repository remoteRepo = fileKey.open(false); + remoteRepo.create(true); + + Git gitForPush = Git.cloneRepository().setURI(remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call(); + + // push an empty commit as a base for detecting changes + gitForPush.commit().setMessage("First commit").call(); + RefSpec masterRefSpec = new RefSpec("master"); + gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call(); + + URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); + + Config config = ConfigBuilder.create() + .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + + ConfigurationKeys.GIT_MONITOR_REPO_URI, remoteRepo.getDirectory().getAbsolutePath()) + .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TESTDIR + "/git-flowgraph") + .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5) + .addPrimitive(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()) + .build(); + + //Create a MultiHopFlowCompiler instance + specCompiler = new MultiHopFlowCompiler(config, Optional.absent(), false); + + //Ensure node1 is not present in the graph + Assert.assertNull(specCompiler.getFlowGraph().getNode("node1")); + + // push a new node file + File nodeDir = new File(flowGraphDir, "node1"); + File nodeFile = new File(nodeDir, "node1.properties"); + nodeDir.mkdirs(); + nodeFile.createNewFile(); + Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + "=true\nparam1=val1" + "\n", nodeFile, Charsets.UTF_8); + + // add, commit, push node + gitForPush.add().addFilepattern(formNodeFilePath(flowGraphDir, nodeDir.getName(), nodeFile.getName())).call(); + gitForPush.commit().setMessage("Node commit").call(); + gitForPush.push().setRemote("origin").setRefSpecs(masterRefSpec).call(); + + // polling is every 5 seconds, so wait twice as long and check + TimeUnit.SECONDS.sleep(10); + + //Test that a DataNode is added to FlowGraph + DataNode dataNode = specCompiler.getFlowGraph().getNode("node1"); + Assert.assertEquals(dataNode.getId(), "node1"); + Assert.assertEquals(dataNode.getRawConfig().getString("param1"), "val1"); + } + + private String formNodeFilePath(File flowGraphDir, String groupDir, String fileName) { + return flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR + fileName; + } + + private void cleanUpDir(String dir) throws IOException { + File dirToDelete = new File(dir); + if (dirToDelete.exists()) { + FileUtils.deleteDirectory(new File(dir)); + } + } + @AfterClass - public void tearDown() { + public void tearDown() throws IOException { + cleanUpDir(TESTDIR); + try { + this.specCompiler.getServiceManager().stopAsync().awaitStopped(5, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("Could not stop Service Manager"); + } } public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor {
