Repository: incubator-gobblin Updated Branches: refs/heads/master 312e768f5 -> bd17f1384
[GOBBLIN-276] Change setActive order to prevent flow spec loss Closes #2129 from yukuai518/topologyOrder Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/bd17f138 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/bd17f138 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/bd17f138 Branch: refs/heads/master Commit: bd17f1384736a70a26a0808ca406608436373812 Parents: 312e768 Author: Kuai Yu <[email protected]> Authored: Thu Oct 5 13:35:11 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Oct 5 13:35:11 2017 -0700 ---------------------------------------------------------------------- .../runtime/spec_catalog/TopologyCatalog.java | 5 +++++ .../modules/core/GobblinServiceManager.java | 15 ++++++++++++++- .../modules/flow/BaseFlowToJobSpecCompiler.java | 6 ++++++ .../modules/orchestration/Orchestrator.java | 5 +++-- .../scheduler/GobblinServiceJobScheduler.java | 17 +++++++++++------ 5 files changed, 39 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java index 2122014..c6e02d2 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java @@ -23,7 +23,10 @@ import java.net.URI; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; + import javax.annotation.Nonnull; +import lombok.Getter; import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.reflect.ConstructorUtils; @@ -64,6 +67,8 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, protected final MetricContext metricContext; protected final TopologyCatalog.StandardMetrics metrics; protected final SpecStore specStore; + @Getter + protected CountDownLatch initComplete = new CountDownLatch(1); private final ClassAliasResolver<SpecStore> aliasResolver; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 598371d..a13ed28 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -223,7 +223,6 @@ public class GobblinServiceManager implements ApplicationLauncher { // Register Scheduler to listen to changes in Flows if (isSchedulerEnabled) { this.flowCatalog.addListener(this.scheduler); - this.topologyCatalog.addListener(this.orchestrator); } // Initialize TopologySpecFactory @@ -356,12 +355,22 @@ public class GobblinServiceManager implements ApplicationLauncher { } // Populate TopologyCatalog with all Topologies generated by TopologySpecFactory + // This has to be done after the topologyCatalog service is launched if (this.isTopologySpecFactoryEnabled) { Collection<TopologySpec> topologySpecs = this.topologySpecFactory.getTopologies(); for (TopologySpec topologySpec : topologySpecs) { this.topologyCatalog.put(topologySpec); } } + + // Register Orchestrator to listen to changes in topology + // This has to be done after topologySpecFactory has updated spec store, so that listeners will have the latest updates. + if (isSchedulerEnabled) { + this.topologyCatalog.addListener(this.orchestrator); + } + + // Notify now topologyCatalog has the right information + this.topologyCatalog.getInitComplete().countDown(); } @Override @@ -479,6 +488,7 @@ public class GobblinServiceManager implements ApplicationLauncher { public HelixTaskResult handleMessage() throws InterruptedException { if (jobScheduler.isActive()) { String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE); + LOGGER.info ("ControllerUserDefinedMessage received : {}, type {}", flowSpecUri, _message.getMsgSubType()); try { if (_message.getMsgSubType().equals(ServiceConfigKeys.HELIX_FLOWSPEC_ADD)) { Spec spec = flowCatalog.getSpec(new URI(flowSpecUri)); @@ -493,6 +503,9 @@ public class GobblinServiceManager implements ApplicationLauncher { } catch (SpecNotFoundException | URISyntaxException e) { LOGGER.error("Cannot process Helix message for flowSpecUri: " + flowSpecUri, e); } + } else { + String flowSpecUri = _message.getAttribute(Message.Attributes.INNER_MESSAGE); + LOGGER.info ("ControllerUserDefinedMessage received but ignored due to not in active mode: {}, type {}", flowSpecUri, _message.getMsgSubType()); } HelixTaskResult helixTaskResult = new HelixTaskResult(); helixTaskResult.setSuccess(true); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index 26f4463..db92ef9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -149,6 +149,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { @Override public synchronized void onAddSpec(Spec addedSpec) { + TopologySpec spec = (TopologySpec) addedSpec; + log.info ("Loading topology {}", spec.toLongString()); + for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) { + log.info ("topo: {} --> {}", entry.getKey(), entry.getValue()); + } + topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 261ce6e..286911b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -138,8 +138,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { /** {@inheritDoc} */ @Override public void onAddSpec(Spec addedSpec) { - _log.info("New Spec detected: " + addedSpec); - if (addedSpec instanceof TopologySpec) { _log.info("New Spec detected of type TopologySpec: " + addedSpec); this.specCompiler.onAddSpec(addedSpec); @@ -178,6 +176,9 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { } public void orchestrate(Spec spec) throws Exception { + // Add below waiting because TopologyCatalog and FlowCatalog service can be launched at the same time + this.topologyCatalog.get().getInitComplete().await(); + long startTime = System.nanoTime(); if (spec instanceof FlowSpec) { Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/bd17f138/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index a625f36..0eabf2c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -105,6 +105,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata // Since we are going to change status to isActive=true, schedule all flows if (isActive) { + // Need to set active first; otherwise in the STANDBY->ACTIVE transition, + // the onAddSpec will forward specs to the leader, which is itself. + this.isActive = isActive; if (this.flowCatalog.isPresent()) { Collection<Spec> specs = this.flowCatalog.get().getSpecs(); for (Spec spec : specs) { @@ -117,11 +120,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata for (Spec spec : this.scheduledFlowSpecs.values()) { onDeleteSpec(spec.getUri(), spec.getVersion()); } + // Need to set active at the end; otherwise in the ACTIVE->STANDBY transition, + // the onDeleteSpec will forward specs to the leader, which is itself. + this.isActive = isActive; } - - // Change status after invoking addition / removal of specs, or else they will use isActive - // .. to exhibit behavior for updated iActive value - this.isActive = isActive; } @Override @@ -129,8 +131,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata super.startUp(); } + /** + * Synchronize the job scheduling because the same flowSpec can be scheduled by different threads. + */ @Override - public void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException { + public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException { Map<String, Object> additionalJobDataMap = Maps.newHashMap(); additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC, this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY))); @@ -162,7 +167,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata return; } - _log.info("New Spec detected: " + addedSpec); + _log.info("New Flow Spec detected: " + addedSpec); if (addedSpec instanceof FlowSpec) { if (!isActive && helixManager.isPresent()) {
