Repository: incubator-gobblin Updated Branches: refs/heads/master 43c561c95 -> f861dca32
[GOBBLIN-659] Ensure MultiHopFlowCompiler is properly initialized before attempting flow orchestration.[] Closes #2529 from sv2000/gaasStartUp Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f861dca3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f861dca3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f861dca3 Branch: refs/heads/master Commit: f861dca3212e874d64433f573d73927a025e44a5 Parents: 43c561c Author: suvasude <[email protected]> Authored: Thu Jan 3 21:12:22 2019 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Jan 3 21:12:22 2019 -0800 ---------------------------------------------------------------------- .../modules/core/GitFlowGraphMonitor.java | 11 ++++++++-- .../modules/core/GobblinServiceManager.java | 4 ++++ .../modules/flow/BaseFlowToJobSpecCompiler.java | 9 ++++++++ .../modules/flow/MultiHopFlowCompiler.java | 23 +++++++++++++++++--- .../service/modules/flow/SpecCompiler.java | 19 +++++++++++++++- .../modules/orchestration/Orchestrator.java | 3 +++ .../modules/core/GitFlowGraphMonitorTest.java | 3 ++- .../modules/flow/MultiHopFlowCompilerTest.java | 2 ++ 8 files changed, 67 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java index 54544a1..a0b53fe 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -90,12 +91,14 @@ public class GitFlowGraphMonitor extends GitMonitoringService { private FlowGraph flowGraph; private final Map<URI, TopologySpec> topologySpecMap; private final Config emptyConfig = ConfigFactory.empty(); + private final CountDownLatch initComplete; - public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap) { + public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) { super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK)); this.flowCatalog = flowCatalog; this.flowGraph = graph; this.topologySpecMap = topologySpecMap; + this.initComplete = initComplete; } /** @@ -104,7 +107,7 @@ public class GitFlowGraphMonitor extends GitMonitoringService { */ @Override public boolean shouldPollGit() { - return true; + return this.isActive; } /** @@ -129,7 +132,11 @@ public class GitFlowGraphMonitor extends GitMonitoringService { return o1Depth.compareTo(o2Depth); }); processGitConfigChangesHelper(changes); + //Decrements the latch count. The countdown latch is initialized to 1. So after the first time the latch is decremented, + // the following operation should be a no-op. + this.initComplete.countDown(); } + /** * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to * the {@link FlowGraph} for an added, updated or modified node or edge file. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 4091d21..a77833e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -87,6 +87,7 @@ import org.apache.gobblin.service.NoopRequesterService; import org.apache.gobblin.service.RequesterService; import org.apache.gobblin.service.Schedule; import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler; @@ -447,6 +448,9 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri // Notify now topologyCatalog has the right information this.topologyCatalog.getInitComplete().countDown(); + + //Activate the SpecCompiler, after the topologyCatalog has been initialized. + this.orchestrator.getSpecCompiler().setActive(true); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index adef5cc..0f51185 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -95,6 +95,9 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { protected Optional<Meter> flowCompilationFailedMeter; @Getter protected Optional<Timer> flowCompilationTimer; + @Getter + @Setter + protected boolean active; public BaseFlowToJobSpecCompiler(Config config){ this(config,true); @@ -150,6 +153,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { } @Override + public void awaitHealthy() throws InterruptedException { + //Do nothing + return; + } + + @Override public synchronized void onAddSpec(Spec addedSpec) { TopologySpec spec = (TopologySpec) addedSpec; log.info ("Loading topology {}", spec.toLongString()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index c1b3e84..01ed778 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -67,7 +68,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { @Getter private ServiceManager serviceManager; @Getter - private boolean active; + private CountDownLatch initComplete = new CountDownLatch(1); private GitFlowGraphMonitor gitFlowGraphMonitor; @@ -94,7 +95,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { throw new RuntimeException("Cannot instantiate " + getClass().getName(), e); } this.flowGraph = new BaseFlowGraph(); - this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap); + this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete()); this.serviceManager = new ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor)); addShutdownHook(); //Start the git flow graph monitor @@ -112,11 +113,27 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { this.flowGraph = flowGraph; } + /** + * Mark the {@link SpecCompiler} as active. This in turn activates the {@link GitFlowGraphMonitor}, allowing to start polling + * and processing changes + * @param active + */ + @Override public void setActive(boolean active) { - this.active = active; + super.setActive(active); this.gitFlowGraphMonitor.setActive(active); } + @Override + public void awaitHealthy() throws InterruptedException { + if (this.getInitComplete().getCount() > 0) { + log.info("Waiting for the MultiHopFlowCompiler to become healthy.."); + this.getInitComplete().await(); + log.info("The MultihopFlowCompiler is healthy and ready to orchestrate flows."); + } + return; + } + /** * j * @param spec an instance of {@link FlowSpec}. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java index 53cdf83..ae17e7e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java @@ -38,7 +38,7 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable { * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s * and the mapping to {@link SpecExecutor} that they can be run on. * All the specs generated from the compileFlow must have a - * {@link org.apache.gobblin.configuration.ConfigurationKeys.FLOW_EXECUTION_ID_KEY} + * {@value org.apache.gobblin.configuration.ConfigurationKeys#FLOW_EXECUTION_ID_KEY} * @param spec {@link Spec} to compile. * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}. */ @@ -50,4 +50,21 @@ public interface SpecCompiler extends SpecCatalogListener, Instrumentable { * @return Map of {@link Spec} URI and {@link TopologySpec} */ Map<URI, TopologySpec> getTopologySpecMap(); + + /** + * Mark the {@link SpecCompiler} active/inactive. Useful to trigger the initialization of {@link SpecCompiler}, if + * necessary, before it can start compiling {@link org.apache.gobblin.runtime.api.FlowSpec}s. + * @param active + */ + void setActive(boolean active); + + /** + * Waits for the {@link SpecCompiler} to become healthy. A {@link SpecCompiler} is healthy when all the component + * services it depends on have been successfully initialized. For instance, the {@link MultiHopFlowCompiler} is healthy + * when the {@link org.apache.gobblin.service.modules.flowgraph.DataNode}s and {@link org.apache.gobblin.service.modules.flowgraph.FlowEdge}s + * can be added to the {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph}. The {@link org.apache.gobblin.service.modules.flowgraph.FlowEdge} + * instantiation in turn depends on the successful initialization of {@link org.apache.gobblin.runtime.spec_catalog.TopologyCatalog}, which + * instantiates all the configured {@link SpecExecutor}s. + */ + public void awaitHealthy() throws InterruptedException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 3dd7538..fbc38e6 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -197,6 +197,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { // Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time this.topologyCatalog.get().getInitComplete().await(); + //Wait for the SpecCompiler to become healthy. + this.getSpecCompiler().awaitHealthy(); + long startTime = System.nanoTime(); if (spec instanceof FlowSpec) { TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent() http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java index ab8af70..13a452a 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java @@ -24,6 +24,7 @@ import java.net.URISyntaxException; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import org.apache.commons.io.FileUtils; @@ -122,7 +123,7 @@ public class GitFlowGraphMonitorTest { //Create a FlowGraph instance with defaults this.flowGraph = new BaseFlowGraph(); - this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap); + this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1)); this.gitFlowGraphMonitor.setActive(true); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f861dca3/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index 6b51a77..5877e66 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -618,6 +618,8 @@ public class MultiHopFlowCompilerTest { //Create a MultiHopFlowCompiler instance specCompiler = new MultiHopFlowCompiler(config, Optional.absent(), false); + specCompiler.setActive(true); + //Ensure node1 is not present in the graph Assert.assertNull(specCompiler.getFlowGraph().getNode("node1"));
