Repository: incubator-gobblin Updated Branches: refs/heads/master 8f72514c6 -> 55a19bbfe
[GOBBLIN-670] Ensure MultiHopFlowCompiler is initialized when job template catalog location is not provided.[] Closes #2540 from sv2000/startUp Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55a19bbf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55a19bbf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55a19bbf Branch: refs/heads/master Commit: 55a19bbfe693e68828ff2a85e6e0666b754b6a9b Parents: 8f72514 Author: suvasude <[email protected]> Authored: Wed Jan 23 06:38:08 2019 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Jan 23 06:38:08 2019 -0800 ---------------------------------------------------------------------- .../service/modules/core/GitFlowGraphMonitor.java | 17 +++++++++++------ .../modules/flow/MultiHopFlowCompiler.java | 18 +++++++++++------- .../modules/core/GitFlowGraphMonitorTest.java | 5 +++-- 3 files changed, 25 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java index a0b53fe..978875f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -32,6 +32,7 @@ import org.eclipse.jgit.api.errors.GitAPIException; import org.eclipse.jgit.diff.DiffEntry; import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import com.typesafe.config.Config; @@ -87,13 +88,13 @@ public class GitFlowGraphMonitor extends GitMonitoringService { .put(SHOULD_CHECKPOINT_HASHES, false) .build()); - private FSFlowCatalog flowCatalog; + private Optional<FSFlowCatalog> flowCatalog; private FlowGraph flowGraph; private final Map<URI, TopologySpec> topologySpecMap; private final Config emptyConfig = ConfigFactory.empty(); private final CountDownLatch initComplete; - public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) { + public GitFlowGraphMonitor(Config config, Optional<FSFlowCatalog> flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch initComplete) { super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK)); this.flowCatalog = flowCatalog; this.flowGraph = graph; @@ -223,11 +224,15 @@ public class GitFlowGraphMonitor extends GitMonitoringService { Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(edgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)); FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, edgeConfig); - FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog, specExecutors); - if (!this.flowGraph.addFlowEdge(edge)) { - log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId()); + if (flowCatalog.isPresent()) { + FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig, flowCatalog.get(), specExecutors); + if (!this.flowGraph.addFlowEdge(edge)) { + log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId()); + } else { + log.info("Added edge {} to FlowGraph", edge.getId()); + } } else { - log.info("Added edge {} to FlowGraph", edge.getId()); + log.warn("Could not add edge defined in {} to FlowGraph as FlowCatalog is absent", change.getNewPath()); } } catch (Exception e) { log.warn("Could not add edge defined in {} due to exception {}", change.getNewPath(), e.getMessage()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 3f412f4..49915ef 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; @@ -86,13 +87,16 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) { super(config, log, instrumentationEnabled); - Config templateCatalogCfg = config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, - config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); - FSFlowCatalog flowCatalog; - try { - flowCatalog = new FSFlowCatalog(templateCatalogCfg); - } catch (IOException e) { - throw new RuntimeException("Cannot instantiate " + getClass().getName(), e); + Optional<FSFlowCatalog> flowCatalog; + if (config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY) + && StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) { + try { + flowCatalog = Optional.of(new FSFlowCatalog(config)); + } catch (IOException e) { + throw new RuntimeException("Cannot instantiate " + getClass().getName(), e); + } + } else { + flowCatalog = Optional.absent(); } this.flowGraph = new BaseFlowGraph(); Config gitFlowGraphConfig = this.config; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55a19bbf/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java index 13a452a..26f2180 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java @@ -46,6 +46,7 @@ import org.testng.annotations.Test; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -80,7 +81,7 @@ public class GitFlowGraphMonitorTest { private final File edge1File = new File(edge1Dir, "edge1.properties"); private RefSpec masterRefSpec = new RefSpec("master"); - private FSFlowCatalog flowCatalog; + private Optional<FSFlowCatalog> flowCatalog; private Config config; private BaseFlowGraph flowGraph; private GitFlowGraphMonitor gitFlowGraphMonitor; @@ -118,7 +119,7 @@ public class GitFlowGraphMonitorTest { Config templateCatalogCfg = config .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); - this.flowCatalog = new FSFlowCatalog(templateCatalogCfg); + this.flowCatalog = Optional.of(new FSFlowCatalog(templateCatalogCfg)); //Create a FlowGraph instance with defaults this.flowGraph = new BaseFlowGraph();
