[GOBBLIN-554] Change signature of SpecCompiler#compileFlow() to return a DAG of JobSpecs instead of a HashMap.
Closes #2415 from sv2000/dag1 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ef26d287 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ef26d287 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ef26d287 Branch: refs/heads/master Commit: ef26d287d95edcc5208daed651e8ef8e09efa5d0 Parents: 25530f0 Author: suvasude <[email protected]> Authored: Thu Aug 9 09:40:43 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Aug 9 09:40:43 2018 -0700 ---------------------------------------------------------------------- .../gobblin/runtime/api/SpecCompiler.java | 44 -- .../modules/flow/BaseFlowToJobSpecCompiler.java | 6 +- .../flow/IdentityFlowToJobSpecCompiler.java | 71 ++-- .../modules/flow/MultiHopFlowCompiler.java | 39 +- .../flow/MultiHopsFlowToJobSpecCompiler.java | 13 +- .../service/modules/flow/SpecCompiler.java | 51 +++ .../modules/orchestration/Orchestrator.java | 31 +- .../core/IdentityFlowToJobSpecCompilerTest.java | 40 +- .../modules/flow/FlowGraphPathFinderTest.java | 417 ------------------ .../modules/flow/MultiHopFlowCompilerTest.java | 419 +++++++++++++++++++ .../hdfs-1-to-hdfs-1-encrypt.properties | 2 +- .../flowedges/hdfs-1-to-hdfs-3.properties | 2 +- .../hdfs-2-to-hdfs-2-encrypt.properties | 2 +- .../flowedges/hdfs-2-to-hdfs-4.properties | 2 +- .../flowedges/hdfs-3-to-adls-1.properties | 2 +- .../flowedges/hdfs-4-to-adls-1.properties | 2 +- 16 files changed, 593 insertions(+), 550 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java deleted file mode 100644 index aceb5dd..0000000 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCompiler.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.runtime.api; - -import java.net.URI; -import java.util.Map; - -import org.apache.gobblin.instrumented.Instrumentable; - -/*** - * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s - * and the mapping to {@link SpecExecutor} that they can be run on. - */ -public interface SpecCompiler extends SpecCatalogListener, Instrumentable { - /*** - * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s - * and the mapping to {@link SpecExecutor} that they can be run on. - * @param spec {@link Spec} to compile. - * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}. - */ - Map<Spec, SpecExecutor> compileFlow(Spec spec); - - /*** - * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler} - * is aware about. - * @return Map of {@link Spec} URI and {@link TopologySpec} - */ - Map<URI, TopologySpec> getTopologySpecMap(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index e4c9ae3..6604676 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -40,20 +40,20 @@ import org.slf4j.LoggerFactory; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecCompiler; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.runtime.job_catalog.FSJobCatalog; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.ServiceMetricNames; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; @@ -207,7 +207,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { return this.topologySpecMap; } - public abstract Map<Spec, SpecExecutor> compileFlow(Spec spec); + public abstract Dag<JobExecutionPlan> compileFlow(Spec spec); /** * Naive implementation of generating jobSpec, which fetch the first available template, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java index 3fb20a2..fa843c5 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java @@ -17,27 +17,30 @@ package org.apache.gobblin.service.modules.flow; - +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; - import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.typesafe.config.Config; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.ServiceNode; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.runtime.api.ServiceNode; -import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; /*** @@ -64,12 +67,11 @@ public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { } @Override - public Map<Spec, SpecExecutor> compileFlow(Spec spec) { + public Dag<JobExecutionPlan> compileFlow(Spec spec) { Preconditions.checkNotNull(spec); Preconditions.checkArgument(spec instanceof FlowSpec, "IdentityFlowToJobSpecCompiler only converts FlowSpec to JobSpec"); long startTime = System.nanoTime(); - Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap(); FlowSpec flowSpec = (FlowSpec) spec; String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY); @@ -77,33 +79,44 @@ public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination)); JobSpec jobSpec = jobSpecGenerator(flowSpec); + Instrumented.markMeter(this.flowCompilationSuccessFulMeter); + Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + List<JobExecutionPlan> jobExecutionPlans; + try { + jobExecutionPlans = getJobExecutionPlans(source, destination, jobSpec); + } catch (InterruptedException | ExecutionException e) { + Instrumented.markMeter(this.flowCompilationFailedMeter); + throw new RuntimeException("Cannot determine topology capabilities", e); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); + } + + private List<JobExecutionPlan> getJobExecutionPlans(String source, String destination, JobSpec jobSpec) + throws ExecutionException, InterruptedException { + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); for (TopologySpec topologySpec : topologySpecMap.values()) { - try { - Map<ServiceNode, ServiceNode> capabilities = (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get(); - for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) { - log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with " - + "capability of source: %s and destination: %s ", jobSpec.getUri(), - topologySpec.getUri(), capability.getKey(), capability.getValue())); - if (source.equals(capability.getKey().getNodeName()) && destination.equals(capability.getValue().getNodeName())) { - specExecutorMap.put(jobSpec, topologySpec.getSpecExecutor()); - log.info(String.format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.", - jobSpec.getUri(), topologySpec.getUri())); - - log.info("Since we found a candidate executor, we will not try to compute more. " - + "(Intended limitation for IdentityFlowToJobSpecCompiler)"); - return specExecutorMap; - } + Map<ServiceNode, ServiceNode> capabilities = + (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get(); + for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) { + log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with " + + "capability of source: %s and destination: %s ", jobSpec.getUri(), topologySpec.getUri(), + capability.getKey(), capability.getValue())); + if (source.equals(capability.getKey().getNodeName()) && destination + .equals(capability.getValue().getNodeName())) { + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(jobSpec, topologySpec.getSpecExecutor()); + log.info(String + .format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.", + jobSpec.getUri(), topologySpec.getUri())); + + log.info("Since we found a candidate executor, we will not try to compute more. " + + "(Intended limitation for IdentityFlowToJobSpecCompiler)"); + jobExecutionPlans.add(jobExecutionPlan); + return jobExecutionPlans; } - } catch (InterruptedException | ExecutionException e) { - Instrumented.markMeter(this.flowCompilationFailedMeter); - throw new RuntimeException("Cannot determine topology capabilities", e); } } - Instrumented.markMeter(this.flowCompilationSuccessFulMeter); - Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - - return specExecutorMap; + return jobExecutionPlans; } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 8b14b10..a281aa4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -41,12 +42,14 @@ import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor; import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.FlowGraph; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; @@ -90,53 +93,50 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, flowCatalog, this.flowGraph); } + @VisibleForTesting + MultiHopFlowCompiler(Config config, FlowGraph flowGraph) { + super(config, Optional.absent(), true); + this.flowGraph = flowGraph; + } + public void setActive(boolean active) { this.active = active; this.gitFlowGraphMonitor.setActive(active); } /** - * TODO: We need to change signature of compileFlow to return a Dag instead of a HashMap to capture - * job dependencies. - * @param spec - * @return + * j + * @param spec an instance of {@link FlowSpec}. + * @return A DAG of {@link JobExecutionPlan}s, which encapsulates the compiled {@link org.apache.gobblin.runtime.api.JobSpec}s + * together with the {@link SpecExecutor} where the job can be executed. */ @Override - public Map<Spec, SpecExecutor> compileFlow(Spec spec) { + public Dag<JobExecutionPlan> compileFlow(Spec spec) { Preconditions.checkNotNull(spec); - Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowToJobSpecCompiler only accepts FlowSpecs"); + Preconditions.checkArgument(spec instanceof FlowSpec, "MultiHopFlowCompiler only accepts FlowSpecs"); long startTime = System.nanoTime(); - Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap(); FlowSpec flowSpec = (FlowSpec) spec; String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY); String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY); log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination)); + Dag<JobExecutionPlan> jobExecutionPlanDag; FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec); try { //Compute the path from source to destination. FlowGraphPath flowGraphPath = pathFinder.findPath(); //Convert the path into a Dag of JobExecutionPlans. - Dag<JobExecutionPlan> jobExecutionPlanDag; if (flowGraphPath != null) { jobExecutionPlanDag = flowGraphPath.asDag(); } else { Instrumented.markMeter(this.flowCompilationFailedMeter); log.info(String.format("No path found from source: %s and destination: %s", source, destination)); - return null; + return new JobExecutionPlanDagFactory().createDag(new ArrayList<>()); } - - //TODO: Just a dummy return value for now. compileFlow() signature needs to be modified to return a Dag instead - // of a Map. For now just add all specs into the map. - for (Dag.DagNode<JobExecutionPlan> node: jobExecutionPlanDag.getNodes()) { - JobExecutionPlan jobExecutionPlan = node.getValue(); - specExecutorMap.put(jobExecutionPlan.getJobSpec(), jobExecutionPlan.getSpecExecutor()); - } - } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | IOException - | URISyntaxException e) { + } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | IOException e) { Instrumented.markMeter(this.flowCompilationFailedMeter); log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e); return null; @@ -144,7 +144,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { Instrumented.markMeter(this.flowCompilationSuccessFulMeter); Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - return specExecutorMap; + return jobExecutionPlanDag; } @Override @@ -153,5 +153,4 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { return; } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java index 544ca42..236f927 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java @@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.flow; import com.google.common.base.Splitter; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -35,7 +36,10 @@ import com.typesafe.config.ConfigValueFactory; import org.apache.commons.lang3.StringUtils; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.policy.ServicePolicy; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.jgrapht.graph.DirectedWeightedMultigraph; @@ -132,11 +136,16 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { } @Override - public Map<Spec, SpecExecutor> compileFlow(Spec spec) { + public Dag<JobExecutionPlan> compileFlow(Spec spec) { // A Map from JobSpec to SpexExecutor, as the output of Flow Compiler. Map<Spec, SpecExecutor> specExecutorInstanceMap = Maps.newLinkedHashMap(); findPath(specExecutorInstanceMap, spec); - return specExecutorInstanceMap; + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + for (Map.Entry<Spec, SpecExecutor> entry: specExecutorInstanceMap.entrySet()) { + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan((JobSpec) entry.getKey(), entry.getValue()); + jobExecutionPlans.add(jobExecutionPlan); + } + return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java new file mode 100644 index 0000000..3ce3b70 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.net.URI; +import java.util.Map; + +import org.apache.gobblin.instrumented.Instrumentable; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecCatalogListener; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +/*** + * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s + * and the mapping to {@link SpecExecutor} that they can be run on. + */ +public interface SpecCompiler extends SpecCatalogListener, Instrumentable { + /*** + * Take in a logical {@link Spec} and compile corresponding materialized {@link Spec}s + * and the mapping to {@link SpecExecutor} that they can be run on. + * @param spec {@link Spec} to compile. + * @return Map of materialized physical {@link Spec} and {@link SpecExecutor}. + */ + Dag<JobExecutionPlan> compileFlow(Spec spec); + + /*** + * Map of {@link Spec} URI and {@link TopologySpec} the {@link SpecCompiler} + * is aware about. + * @return Map of {@link Spec} URI and {@link TopologySpec} + */ + Map<URI, TopologySpec> getTopologySpecMap(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 1b3907d..4959b1a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -38,7 +38,7 @@ import org.apache.gobblin.instrumented.Instrumentable; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.SpecCompiler; +import org.apache.gobblin.service.modules.flow.SpecCompiler; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecCatalogListener; @@ -46,6 +46,8 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.ServiceMetricNames; import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.slf4j.LoggerFactory; @@ -186,25 +188,27 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { long startTime = System.nanoTime(); if (spec instanceof FlowSpec) { - Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec); + Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec); - if (specExecutorInstanceMap.isEmpty()) { + if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) { _log.warn("Cannot determine an executor to run on for Spec: " + spec); return; } // Schedule all compiled JobSpecs on their respective Executor - for (Map.Entry<Spec, SpecExecutor> specsToExecute : specExecutorInstanceMap.entrySet()) { + for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) { + JobExecutionPlan jobExecutionPlan = dagNode.getValue(); + // Run this spec on selected executor SpecProducer producer = null; try { - producer = specsToExecute.getValue().getProducer().get(); - Spec jobSpec = specsToExecute.getKey(); + producer = jobExecutionPlan.getSpecExecutor().getProducer().get(); + Spec jobSpec = jobExecutionPlan.getJobSpec(); _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); producer.addSpec(jobSpec); } catch(Exception e) { - _log.error("Cannot successfully setup spec: " + specsToExecute.getKey() + " on executor: " + producer + + _log.error("Cannot successfully setup spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + " for flow: " + spec, e); } } @@ -221,25 +225,26 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { // .. this will work for Identity compiler but not always for multi-hop. // Note: Current logic assumes compilation is consistent between all executions if (spec instanceof FlowSpec) { - Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec); + Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec); - if (specExecutorInstanceMap.isEmpty()) { + if (jobExecutionPlanDag.isEmpty()) { _log.warn("Cannot determine an executor to delete Spec: " + spec); return; } // Delete all compiled JobSpecs on their respective Executor - for (Map.Entry<Spec, SpecExecutor> specsToDelete : specExecutorInstanceMap.entrySet()) { + for (Dag.DagNode<JobExecutionPlan> dagNode: jobExecutionPlanDag.getNodes()) { + JobExecutionPlan jobExecutionPlan = dagNode.getValue(); // Delete this spec on selected executor SpecProducer producer = null; try { - producer = specsToDelete.getValue().getProducer().get(); - Spec jobSpec = specsToDelete.getKey(); + producer = jobExecutionPlan.getSpecExecutor().getProducer().get(); + Spec jobSpec = jobExecutionPlan.getJobSpec(); _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer)); producer.deleteSpec(jobSpec.getUri(), headers); } catch(Exception e) { - _log.error("Cannot successfully delete spec: " + specsToDelete.getKey() + " on executor: " + producer + + _log.error("Cannot successfully delete spec: " + jobExecutionPlan.getJobSpec() + " on executor: " + producer + " for flow: " + spec, e); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index 2dbe790..c1134a8 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -16,13 +16,11 @@ */ package org.apache.gobblin.service.modules.core; - import java.io.File; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Properties; import org.apache.commons.io.FileUtils; @@ -40,14 +38,15 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.api.SpecProducer; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; public class IdentityFlowToJobSpecCompilerTest { private static final Logger logger = LoggerFactory.getLogger(IdentityFlowToJobSpecCompilerTest.class); @@ -187,14 +186,15 @@ public class IdentityFlowToJobSpecCompilerTest { FlowSpec flowSpec = initFlowSpec(); // Run compiler on flowSpec - Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec); + Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec); // Assert pre-requisites - Assert.assertNotNull(specExecutorMapping, "Expected non null mapping."); - Assert.assertTrue(specExecutorMapping.size() == 1, "Exepected 1 executor for FlowSpec."); + Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag."); + Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 1, "Exepected 1 executor for FlowSpec."); // Assert FlowSpec compilation - Spec spec = specExecutorMapping.keySet().iterator().next(); + Dag.DagNode<JobExecutionPlan> dagNode = jobExecutionPlanDag.getStartNodes().get(0); + Spec spec = dagNode.getValue().getJobSpec(); Assert.assertTrue(spec instanceof JobSpec, "Expected JobSpec compiled from FlowSpec."); // Assert JobSpec properties @@ -209,6 +209,9 @@ public class IdentityFlowToJobSpecCompilerTest { Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME); Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_FLOW_GROUP); Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); + + //Assert the start node has no children. + Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode)); } @Test @@ -216,14 +219,16 @@ public class IdentityFlowToJobSpecCompilerTest { FlowSpec flowSpec = initFlowSpec(); // Run compiler on flowSpec - Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithoutTemplateCalague.compileFlow(flowSpec); + Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithoutTemplateCalague.compileFlow(flowSpec); // Assert pre-requisites - Assert.assertNotNull(specExecutorMapping, "Expected non null mapping."); - Assert.assertTrue(specExecutorMapping.size() == 1, "Exepected 1 executor for FlowSpec."); + Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag."); + Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 1, "Exepected 1 executor for FlowSpec."); // Assert FlowSpec compilation - Spec spec = specExecutorMapping.keySet().iterator().next(); + Assert.assertEquals(jobExecutionPlanDag.getStartNodes().size(), 1); + Dag.DagNode<JobExecutionPlan> dagNode = jobExecutionPlanDag.getStartNodes().get(0); + Spec spec = dagNode.getValue().getJobSpec(); Assert.assertTrue(spec instanceof JobSpec, "Expected JobSpec compiled from FlowSpec."); // Assert JobSpec properties @@ -238,6 +243,9 @@ public class IdentityFlowToJobSpecCompilerTest { Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME); Assert.assertEquals(jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_FLOW_GROUP); Assert.assertTrue(jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); + + //Assert the start node has no children. + Assert.assertNull(jobExecutionPlanDag.getChildren(dagNode)); } @Test @@ -245,10 +253,10 @@ public class IdentityFlowToJobSpecCompilerTest { FlowSpec flowSpec = initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, "unsupportedSource", "unsupportedSink"); // Run compiler on flowSpec - Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec); + Dag<JobExecutionPlan> jobExecutionPlanDag = this.compilerWithTemplateCalague.compileFlow(flowSpec); // Assert pre-requisites - Assert.assertNotNull(specExecutorMapping, "Expected non null mapping."); - Assert.assertTrue(specExecutorMapping.size() == 0, "Exepected 1 executor for FlowSpec."); + Assert.assertNotNull(jobExecutionPlanDag, "Expected non null dag."); + Assert.assertTrue(jobExecutionPlanDag.getNodes().size() == 0, "Exepected 1 executor for FlowSpec."); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java deleted file mode 100644 index 5d0500c..0000000 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinderTest.java +++ /dev/null @@ -1,417 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.flow; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import java.util.Properties; -import java.util.concurrent.Future; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import com.google.common.base.Charsets; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import com.typesafe.config.ConfigSyntax; - -import lombok.extern.slf4j.Slf4j; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.runtime.api.SpecProducer; -import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; -import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.flowgraph.DataNode; -import org.apache.gobblin.service.modules.flowgraph.FlowEdge; -import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory; -import org.apache.gobblin.service.modules.flowgraph.FlowGraph; -import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; -import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; -import org.apache.gobblin.util.CompletedFuture; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.reflection.GobblinConstructorUtils; - - -@Slf4j -public class FlowGraphPathFinderTest { - private FlowGraph flowGraph; - private FlowGraphPathFinder pathFinder; - - @BeforeClass - public void setUp() - throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException { - //Create a FlowGraph - this.flowGraph = new BaseFlowGraph(); - - //Add DataNodes to the graph from the node properties files - URI dataNodesUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI(); - FileSystem fs = FileSystem.get(dataNodesUri, new Configuration()); - Path dataNodesPath = new Path(dataNodesUri); - ConfigParseOptions options = ConfigParseOptions.defaults() - .setSyntax(ConfigSyntax.PROPERTIES) - .setAllowMissing(false); - - for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) { - try (InputStream is = fs.open(fileStatus.getPath())) { - Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options); - Class dataNodeClass = Class.forName(ConfigUtils - .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS)); - DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig); - this.flowGraph.addDataNode(dataNode); - } - } - - //Create a FSFlowCatalog instance - URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); - // Create a FSFlowCatalog instance - Properties properties = new Properties(); - properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); - Config config = ConfigFactory.parseProperties(properties); - Config templateCatalogCfg = config - .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, - config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); - FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg); - - - //Add FlowEdges from the edge properties files - URI flowEdgesURI = FlowGraphPathFinderTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI(); - fs = FileSystem.get(flowEdgesURI, new Configuration()); - Path flowEdgesPath = new Path(flowEdgesURI); - for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) { - log.warn(fileStatus.getPath().toString()); - try (InputStream is = fs.open(fileStatus.getPath())) { - Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options); - Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, - FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)); - FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config); - FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog); - this.flowGraph.addFlowEdge(edge); - } - } - - //Create a flow spec - Properties flowProperties = new Properties(); - flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *"); - flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup"); - flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName"); - flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, "LocalFS-1"); - flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, "ADLS-1"); - Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties); - - //Get the input/output dataset config from a file - URI flowConfigUri = FlowGraphPathFinderTest.class.getClassLoader().getResource("flow/flow.conf").toURI(); - Path flowConfigPath = new Path(flowConfigUri); - FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration()); - try (InputStream is = fs1.open(flowConfigPath)) { - Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset())); - flowConfig = flowConfig.withFallback(datasetConfig).resolve(); - } - - FlowSpec.Builder flowSpecBuilder = null; - flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri()) - .withConfig(flowConfig) - .withDescription("dummy description") - .withVersion(FlowSpec.Builder.DEFAULT_VERSION); - - FlowSpec spec = flowSpecBuilder.build(); - this.pathFinder = new FlowGraphPathFinder(this.flowGraph, spec); - } - - @Test - public void testFindPath() - throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException, - SpecNotFoundException, IOException { - Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag(); - Assert.assertEquals(jobDag.getNodes().size(), 4); - Assert.assertEquals(jobDag.getStartNodes().size(), 1); - Assert.assertEquals(jobDag.getEndNodes().size(), 1); - - //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1" - Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); - JobExecutionPlan jobSpecWithExecutor = startNode.getValue(); - JobSpec jobSpec = jobSpecWithExecutor.getJobSpec(); - - //Ensure the resolved job config for the first hop has the correct substitutions. - Config jobConfig = jobSpec.getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); - String from = jobConfig.getString("from"); - String to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/out/testTeam/testDataset"); - Assert.assertEquals(to, "/data/out/testTeam/testDataset"); - String sourceFsUri = jobConfig.getString("fs.uri"); - Assert.assertEquals(sourceFsUri, "file:///"); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri); - Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri); - String targetFsUri = jobConfig.getString("target.filebased.fs.uri"); - Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri); - Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from); - Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); - Assert.assertEquals(jobConfig.getString("type"), "java"); - Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher"); - Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL"); - //Ensure the spec executor has the correct configurations - SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "fs:///"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); - - //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt" - Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); - Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); - jobSpecWithExecutor = secondHopNode.getValue(); - jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); - from = jobConfig.getString("from"); - to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/out/testTeam/testDataset"); - Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from); - Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); - specExecutor = jobSpecWithExecutor.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); - - //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3" - Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); - jobSpecWithExecutor = thirdHopNode.getValue(); - jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); - from = jobConfig.getString("from"); - to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); - Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); - Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); - //Ensure the spec executor has the correct configurations - specExecutor = jobSpecWithExecutor.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); - - //Get the 4th hop - "Distcp from HDFS3 to ADLS-1" - Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); - jobSpecWithExecutor = fourthHopNode.getValue(); - jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); - from = jobConfig.getString("from"); - to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/"); - Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); - Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); - Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); - Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234"); - Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential"); - Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password"); - //Ensure the spec executor has the correct configurations - specExecutor = jobSpecWithExecutor.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); - - //Ensure the fourth hop is the last - Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode); - } - - @Test (dependsOnMethods = "testFindPath") - public void testFindPathAfterFirstEdgeDeletion() - throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException, - SpecNotFoundException, IOException { - //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt. - this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt"); - - Dag<JobExecutionPlan> jobDag = pathFinder.findPath().asDag(); - Assert.assertEquals(jobDag.getNodes().size(), 4); - Assert.assertEquals(jobDag.getStartNodes().size(), 1); - Assert.assertEquals(jobDag.getEndNodes().size(), 1); - - //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2" - Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); - JobExecutionPlan jobExecutionPlan = startNode.getValue(); - JobSpec jobSpec = jobExecutionPlan.getJobSpec(); - - //Ensure the resolved job config for the first hop has the correct substitutions. - Config jobConfig = jobSpec.getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); - String from = jobConfig.getString("from"); - String to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/out/testTeam/testDataset"); - Assert.assertEquals(to, "/data/out/testTeam/testDataset"); - String sourceFsUri = jobConfig.getString("fs.uri"); - Assert.assertEquals(sourceFsUri, "file:///"); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri); - Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri); - String targetFsUri = jobConfig.getString("target.filebased.fs.uri"); - Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri); - Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from); - Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); - Assert.assertEquals(jobConfig.getString("type"), "java"); - Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher"); - Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL"); - //Ensure the spec executor has the correct configurations - SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "fs:///"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); - - //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt" - Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); - Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); - jobExecutionPlan = secondHopNode.getValue(); - jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); - from = jobConfig.getString("from"); - to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/out/testTeam/testDataset"); - Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from); - Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); - specExecutor = jobExecutionPlan.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); - - //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4" - Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); - jobExecutionPlan = thirdHopNode.getValue(); - jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); - from = jobConfig.getString("from"); - to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); - Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); - Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); - //Ensure the spec executor has the correct configurations - specExecutor = jobExecutionPlan.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); - - //Get the 4th hop - "Distcp from HDFS4 to ADLS-1" - Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); - jobExecutionPlan = fourthHopNode.getValue(); - jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); - from = jobConfig.getString("from"); - to = jobConfig.getString("to"); - Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/"); - Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); - Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); - Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); - Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234"); - Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential"); - Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password"); - //Ensure the spec executor has the correct configurations - specExecutor = jobExecutionPlan.getSpecExecutor(); - Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443"); - Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest.TestAzkabanSpecExecutor"); - - //Ensure the fourth hop is the last - Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode); - } - - @Test (dependsOnMethods = "testFindPathAfterFirstEdgeDeletion") - public void testFindPathAfterSecondEdgeDeletion() - throws FlowGraphPathFinder.PathFinderException, URISyntaxException, JobTemplate.TemplateException, - SpecNotFoundException, IOException { - //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt. - this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt"); - - //Ensure no path to destination. - Assert.assertNull(pathFinder.findPath()); - } - - @AfterClass - public void tearDown() { - } - - public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor { - // Executor Instance - protected final Config config; - - private SpecProducer<Spec> azkabanSpecProducer; - - public TestAzkabanSpecExecutor(Config config) { - super(config); - this.config = config; - } - - @Override - protected void startUp() throws Exception { - //Do nothing - } - - @Override - protected void shutDown() throws Exception { - //Do nothing - } - - @Override - public Future<String> getDescription() { - return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null); - } - - @Override - public Future<? extends SpecProducer> getProducer() { - return new CompletedFuture<>(this.azkabanSpecProducer, null); - } - - @Override - public Future<Config> getConfig() { - return new CompletedFuture<>(config, null); - } - - @Override - public Future<String> getHealth() { - return new CompletedFuture<>("Healthy", null); - } - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java new file mode 100644 index 0000000..b8bac02 --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.Properties; +import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Charsets; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import com.typesafe.config.ConfigSyntax; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DataNode; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory; +import org.apache.gobblin.service.modules.flowgraph.FlowGraph; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +@Slf4j +public class MultiHopFlowCompilerTest { + private FlowGraph flowGraph; + private SpecCompiler specCompiler; + + @BeforeClass + public void setUp() + throws URISyntaxException, IOException, ReflectiveOperationException, FlowEdgeFactory.FlowEdgeCreationException { + //Create a FlowGraph + this.flowGraph = new BaseFlowGraph(); + + //Add DataNodes to the graph from the node properties files + URI dataNodesUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/datanodes").toURI(); + FileSystem fs = FileSystem.get(dataNodesUri, new Configuration()); + Path dataNodesPath = new Path(dataNodesUri); + ConfigParseOptions options = ConfigParseOptions.defaults() + .setSyntax(ConfigSyntax.PROPERTIES) + .setAllowMissing(false); + + for (FileStatus fileStatus: fs.listStatus(dataNodesPath)) { + try (InputStream is = fs.open(fileStatus.getPath())) { + Config nodeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options); + Class dataNodeClass = Class.forName(ConfigUtils + .getString(nodeConfig, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS)); + DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, nodeConfig); + this.flowGraph.addDataNode(dataNode); + } + } + + //Create a FSFlowCatalog instance + URI flowTemplateCatalogUri = this.getClass().getClassLoader().getResource("template_catalog").toURI(); + // Create a FSFlowCatalog instance + Properties properties = new Properties(); + properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, flowTemplateCatalogUri.toString()); + Config config = ConfigFactory.parseProperties(properties); + Config templateCatalogCfg = config + .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + FSFlowCatalog flowCatalog = new FSFlowCatalog(templateCatalogCfg); + + + //Add FlowEdges from the edge properties files + URI flowEdgesURI = MultiHopFlowCompilerTest.class.getClassLoader().getResource("flowgraph/flowedges").toURI(); + fs = FileSystem.get(flowEdgesURI, new Configuration()); + Path flowEdgesPath = new Path(flowEdgesURI); + for (FileStatus fileStatus: fs.listStatus(flowEdgesPath)) { + log.warn(fileStatus.getPath().toString()); + try (InputStream is = fs.open(fileStatus.getPath())) { + Config flowEdgeConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charsets.UTF_8), options); + Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(flowEdgeConfig, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, + FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)); + FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config); + FlowEdge edge = flowEdgeFactory.createFlowEdge(flowEdgeConfig, flowCatalog); + this.flowGraph.addFlowEdge(edge); + } + } + + this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph); + } + + private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination) throws IOException, URISyntaxException { + //Create a flow spec + Properties flowProperties = new Properties(); + flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *"); + flowProperties.put(ConfigurationKeys.FLOW_GROUP_KEY, "testFlowGroup"); + flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName"); + flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source); + flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, destination); + Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties); + + //Get the input/output dataset config from a file + URI flowConfigUri = MultiHopFlowCompilerTest.class.getClassLoader().getResource(flowConfigResource).toURI(); + Path flowConfigPath = new Path(flowConfigUri); + FileSystem fs1 = FileSystem.get(flowConfigUri, new Configuration()); + try (InputStream is = fs1.open(flowConfigPath)) { + Config datasetConfig = ConfigFactory.parseReader(new InputStreamReader(is, Charset.defaultCharset())); + flowConfig = flowConfig.withFallback(datasetConfig).resolve(); + } + + FlowSpec.Builder flowSpecBuilder = null; + flowSpecBuilder = FlowSpec.builder(new Path("/tmp/flowSpecCatalog").toUri()) + .withConfig(flowConfig) + .withDescription("dummy description") + .withVersion(FlowSpec.Builder.DEFAULT_VERSION); + + FlowSpec spec = flowSpecBuilder.build(); + return spec; + } + + @Test + public void testCompileFlow() throws URISyntaxException, IOException { + FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1"); + Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); + Assert.assertEquals(jobDag.getNodes().size(), 4); + Assert.assertEquals(jobDag.getStartNodes().size(), 1); + Assert.assertEquals(jobDag.getEndNodes().size(), 1); + + //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1" + Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); + JobExecutionPlan jobSpecWithExecutor = startNode.getValue(); + JobSpec jobSpec = jobSpecWithExecutor.getJobSpec(); + + //Ensure the resolved job config for the first hop has the correct substitutions. + Config jobConfig = jobSpec.getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String from = jobConfig.getString("from"); + String to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/out/testTeam/testDataset"); + String sourceFsUri = jobConfig.getString("fs.uri"); + Assert.assertEquals(sourceFsUri, "file:///"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri); + Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri); + String targetFsUri = jobConfig.getString("target.filebased.fs.uri"); + Assert.assertEquals(targetFsUri, "hdfs://hadoopnn01.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri); + Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + Assert.assertEquals(jobConfig.getString("type"), "java"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL"); + //Ensure the spec executor has the correct configurations + SpecExecutor specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "fs:///"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); + + //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt" + Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); + Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); + jobSpecWithExecutor = secondHopNode.getValue(); + jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); + + //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3" + Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); + jobSpecWithExecutor = thirdHopNode.getValue(); + jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + //Ensure the spec executor has the correct configurations + specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); + + //Get the 4th hop - "Distcp from HDFS3 to ADLS-1" + Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); + jobSpecWithExecutor = fourthHopNode.getValue(); + jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234"); + Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential"); + Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password"); + //Ensure the spec executor has the correct configurations + specExecutor = jobSpecWithExecutor.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban03.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); + + //Ensure the fourth hop is the last + Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode); + } + + @Test (dependsOnMethods = "testCompileFlow") + public void testCompileFlowAfterFirstEdgeDeletion() throws URISyntaxException, IOException { + //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt. + this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt"); + + FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1"); + Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); + + Assert.assertEquals(jobDag.getNodes().size(), 4); + Assert.assertEquals(jobDag.getStartNodes().size(), 1); + Assert.assertEquals(jobDag.getEndNodes().size(), 1); + + //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2" + Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); + JobExecutionPlan jobExecutionPlan = startNode.getValue(); + JobSpec jobSpec = jobExecutionPlan.getJobSpec(); + + //Ensure the resolved job config for the first hop has the correct substitutions. + Config jobConfig = jobSpec.getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + String from = jobConfig.getString("from"); + String to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/out/testTeam/testDataset"); + String sourceFsUri = jobConfig.getString("fs.uri"); + Assert.assertEquals(sourceFsUri, "file:///"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), sourceFsUri); + Assert.assertEquals(jobConfig.getString("state.store.fs.uri"), sourceFsUri); + String targetFsUri = jobConfig.getString("target.filebased.fs.uri"); + Assert.assertEquals(targetFsUri, "hdfs://hadoopnn02.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri); + Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + Assert.assertEquals(jobConfig.getString("type"), "java"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.runtime.local.LocalJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "LOCAL"); + //Ensure the spec executor has the correct configurations + SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "fs:///"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); + + //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt" + Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); + Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); + jobExecutionPlan = secondHopNode.getValue(); + jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:convert-to-json-and-encrypt"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/out/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.data.directory"), from); + Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to); + specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); + + //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4" + Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); + jobExecutionPlan = thirdHopNode.getValue(); + jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-HDFS"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn02.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + //Ensure the spec executor has the correct configurations + specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); + + //Get the 4th hop - "Distcp from HDFS4 to ADLS-1" + Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); + Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); + jobExecutionPlan = fourthHopNode.getValue(); + jobConfig = jobExecutionPlan.getJobSpec().getConfig(); + Assert.assertEquals(jobConfig.getString("job.name"), "testFlowGroup:testFlowName:Distcp-HDFS-ADL"); + from = jobConfig.getString("from"); + to = jobConfig.getString("to"); + Assert.assertEquals(from, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(to, "/data/encrypted/testTeam/testDataset"); + Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn04.grid.linkedin.com:8888/"); + Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "adl://azuredatalakestore.net/"); + Assert.assertEquals(jobConfig.getString("type"), "hadoopJava"); + Assert.assertEquals(jobConfig.getString("job.class"), "org.apache.gobblin.azkaban.AzkabanJobLauncher"); + Assert.assertEquals(jobConfig.getString("launcher.type"), "MAPREDUCE"); + Assert.assertEquals(jobConfig.getString("dfs.adls.oauth2.client.id"), "1234"); + Assert.assertEquals(jobConfig.getString("writer.encrypted.dfs.adls.oauth2.credential"), "credential"); + Assert.assertEquals(jobConfig.getString("encrypt.key.loc"), "/user/testUser/master.password"); + //Ensure the spec executor has the correct configurations + specExecutor = jobExecutionPlan.getSpecExecutor(); + Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban04.gobblin.net:8443"); + Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); + + //Ensure the fourth hop is the last + Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode); + } + + @Test (dependsOnMethods = "testCompileFlowAfterFirstEdgeDeletion") + public void testCompileFlowAfterSecondEdgeDeletion() throws URISyntaxException, IOException { + //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt. + this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt"); + + FlowSpec spec = createFlowSpec("flow/flow.conf", "LocalFS-1", "ADLS-1"); + Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); + + //Ensure no path to destination. + Assert.assertTrue(jobDag.isEmpty()); + } + + @AfterClass + public void tearDown() { + } + + public static class TestAzkabanSpecExecutor extends AbstractSpecExecutor { + // Executor Instance + protected final Config config; + + private SpecProducer<Spec> azkabanSpecProducer; + + public TestAzkabanSpecExecutor(Config config) { + super(config); + this.config = config; + } + + @Override + protected void startUp() throws Exception { + //Do nothing + } + + @Override + protected void shutDown() throws Exception { + //Do nothing + } + + @Override + public Future<String> getDescription() { + return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null); + } + + @Override + public Future<? extends SpecProducer> getProducer() { + return new CompletedFuture<>(this.azkabanSpecProducer, null); + } + + @Override + public Future<Config> getConfig() { + return new CompletedFuture<>(config, null); + } + + @Override + public Future<String> getHealth() { + return new CompletedFuture<>("Healthy", null); + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties index bcf6d44..44d3c44 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-encrypt.properties @@ -2,7 +2,7 @@ flow.edge.source=HDFS-1 flow.edge.destination=HDFS-1 flow.edge.id=HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties index 99d1ed7..897f003 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-3.properties @@ -2,7 +2,7 @@ flow.edge.source=HDFS-1 flow.edge.destination=HDFS-3 flow.edge.id=HDFS-1:HDFS-3:hdfsToHdfs flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties index 537cbfa..db0ea48 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-2-encrypt.properties @@ -2,7 +2,7 @@ flow.edge.source=HDFS-2 flow.edge.destination=HDFS-2 flow.edge.id=HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ef26d287/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties index 6ec2ea5..44f0408 100644 --- a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-to-hdfs-4.properties @@ -2,7 +2,7 @@ flow.edge.source=HDFS-2 flow.edge.destination=HDFS-4 flow.edge.id=HDFS-2:HDFS-4:hdfsToHdfs flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToHdfs -flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.FlowGraphPathFinderTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
