Repository: incubator-gobblin Updated Branches: refs/heads/master ea5047ea2 -> 9402a9037
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java new file mode 100644 index 0000000..456f3a3 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import com.google.common.base.Splitter; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.modules.policy.ServicePolicy; +import org.apache.gobblin.util.ClassAliasResolver; +import org.jgrapht.graph.DirectedWeightedMultigraph; +import org.slf4j.Logger; +import org.apache.gobblin.runtime.api.FlowEdge; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import static org.apache.gobblin.service.ServiceConfigKeys.*; +import static org.apache.gobblin.service.modules.utils.FindPathUtils.*; + +// Users are capable to inject hints/prioritization into route selection, in two forms: +// 1. PolicyBasedBlockedConnection: Define some undesired routes +// 2. Specified a complete path. FlowCompiler is responsible to verify if the path given is valid. + +// TODO: Flow monitoring, injecting weight for flowEdge:ETL-6213 +@Slf4j +public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { + + private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); + + @Getter + private DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph = + new DirectedWeightedMultigraph<>(LoadBasedFlowEdgeImpl.class); + + public ServicePolicy servicePolicy; + + // Contains user-specified complete path of how the data movement is executed from source to sink. + private Optional<String> optionalUserSpecifiedPath; + + private FlowEdgeProps defaultFlowEdgeProps = new FlowEdgeProps(); + + public MultiHopsFlowToJobSpecCompiler(Config config) { + this(config, Optional.absent(), true); + } + + public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log) { + this(config, log, true); + } + + public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) { + super(config, log, instrumentationEnabled); + String policyClassName = config.hasPath(SERVICE_POLICY_NAME) ? config.getString(SERVICE_POLICY_NAME) + : ServiceConfigKeys.DEFAULT_SERVICE_POLICY; + ClassAliasResolver<ServicePolicy> classResolver = new ClassAliasResolver<>(ServicePolicy.class); + try { + servicePolicy = classResolver.resolveClass(policyClassName).newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Error happen when resolving class for :" + policyClassName, e); + } + + if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION) + && config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION).size() > 0) { + try { + for (String sourceSinkPair : config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION)) { + BaseServiceNodeImpl source = new BaseServiceNodeImpl(sourceSinkPair.split(":")[0]); + BaseServiceNodeImpl sink = new BaseServiceNodeImpl(sourceSinkPair.split(":")[1]); + URI specExecutorURI = new URI(sourceSinkPair.split(":")[2]); + servicePolicy.addFlowEdge( + new LoadBasedFlowEdgeImpl(source, sink, InMemorySpecExecutor.createDummySpecExecutor(specExecutorURI))); + } + } catch (URISyntaxException e) { + this.log.warn("Constructing of FlowEdge in ServicePolicy Failed"); + } + } + + if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES) && + StringUtils.isNotBlank(config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES))) { + for (String blacklistedNode : SPLIT_BY_COMMA.splitToList( + config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES))) { + servicePolicy.addServiceNode(new BaseServiceNodeImpl(blacklistedNode)); + } + } + + if (config.hasPath(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH) && StringUtils.isNotBlank( + config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH))) { + optionalUserSpecifiedPath = Optional.of(config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH)); + } else { + optionalUserSpecifiedPath = Optional.absent(); + } + } + + @Override + public Map<Spec, SpecExecutor> compileFlow(Spec spec) { + // A Map from JobSpec to SpexExecutor, as the output of Flow Compiler. + Map<Spec, SpecExecutor> specExecutorInstanceMap = Maps.newLinkedHashMap(); + findPath(specExecutorInstanceMap, spec); + return specExecutorInstanceMap; + } + + /** + * @return Transform a set of {@link TopologySpec} into a instance of {@link org.jgrapht.graph.WeightedMultigraph} + * and filter out connections between blacklisted vertices that user specified. + * The output of this function only stays in memory, so each time a logical flow is compiled, the multigraph will + * be re-calculated. + * + */ + private void inMemoryWeightGraphGenerator() { + for (TopologySpec topologySpec : topologySpecMap.values()) { + weightGraphGenerateHelper(topologySpec); + } + + // Filter out connection appearing in servicePolicy. + // This is where servicePolicy is enforced. + servicePolicy.populateBlackListedEdges(this.weightedGraph); + if (servicePolicy.getBlacklistedEdges().size() > 0) { + for (FlowEdge toDeletedEdge : servicePolicy.getBlacklistedEdges()) { + weightedGraph.removeEdge(toDeletedEdge); + } + } + } + + // Basically a dijkstra path finding for connecting source and sink by multiple hops in between. + // If there's any user-specified prioritization, conduct the DFS and see if the user-specified path is available. + + // there's no updates on TopologySpec, or user should be aware of the possibility + // that a topologySpec not being reflected in findPath. + private void findPath(Map<Spec, SpecExecutor> specExecutorInstanceMap, Spec spec) { + inMemoryWeightGraphGenerator(); + FlowSpec flowSpec = (FlowSpec) spec; + if (optionalUserSpecifiedPath.isPresent()) { + log.info("Starting to evaluate user's specified path ... "); + if (userSpecifiedPathVerificator(specExecutorInstanceMap, flowSpec)) { + log.info("User specified path[ " + optionalUserSpecifiedPath.get() + "] successfully verified."); + return; + } else { + log.error("Will not execute user specified path[ " + optionalUserSpecifiedPath.get() + "]"); + log.info("Start to execute FlowCompiler's algorithm for valid data movement path"); + } + } + + ServiceNode sourceNode = + new BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY)); + + ServiceNode targetNode = + new BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY)); + + List<FlowEdge> resultEdgePath = dijkstraBasedPathFindingHelper(sourceNode, targetNode, this.weightedGraph); + for (int i = 0; i < resultEdgePath.size() ; i++) { + FlowEdge tmpFlowEdge = resultEdgePath.get(i); + ServiceNode edgeSrcNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getSourceNode(); + ServiceNode edgeTgtNode = ((LoadBasedFlowEdgeImpl) tmpFlowEdge).getTargetNode(); + specExecutorInstanceMap.put(jobSpecGenerator(edgeSrcNode, edgeTgtNode, flowSpec), + ((LoadBasedFlowEdgeImpl) (resultEdgePath.get(i))).getSpecExecutorInstance()); + } + } + + /** + * As the base implementation here, all templates will be considered for each edge. + */ + @Override + protected void populateEdgeTemplateMap() { + if (templateCatalog.isPresent()) { + for (FlowEdge flowEdge : this.weightedGraph.edgeSet()) { + edgeTemplateMap.put(flowEdge.getEdgeIdentity(), templateCatalog.get(). + getAllTemplates(). + stream().map(jobTemplate -> jobTemplate.getUri()).collect(Collectors.toList())); + } + } + } + + // If path specified not existed, return false; + // else return true. + private boolean userSpecifiedPathVerificator(Map<Spec, SpecExecutor> specExecutorInstanceMap, FlowSpec flowSpec) { + Map<Spec, SpecExecutor> tmpSpecExecutorInstanceMap = new HashMap<>(); + List<String> userSpecfiedPath = Arrays.asList(optionalUserSpecifiedPath.get().split(",")); + for (int i = 0; i < userSpecfiedPath.size() - 1; i++) { + ServiceNode sourceNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i)); + ServiceNode targetNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i + 1)); + if (weightedGraph.containsVertex(sourceNode) && weightedGraph.containsVertex(targetNode) + && weightedGraph.containsEdge(sourceNode, targetNode)) { + tmpSpecExecutorInstanceMap.put(jobSpecGenerator(sourceNode, targetNode, flowSpec), + (((LoadBasedFlowEdgeImpl) weightedGraph.getEdge(sourceNode, targetNode)).getSpecExecutorInstance())); + } else { + log.error("User Specified Path is invalid"); + return false; + } + } + specExecutorInstanceMap.putAll(tmpSpecExecutorInstanceMap); + return true; + } + + // Helper function for transform TopologySpecMap into a weightedDirectedGraph. + private void weightGraphGenerateHelper(TopologySpec topologySpec) { + try { + Map<ServiceNode, ServiceNode> capabilities = topologySpec.getSpecExecutor().getCapabilities().get(); + for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) { + + BaseServiceNodeImpl sourceNode = new BaseServiceNodeImpl(capability.getKey().getNodeName()); + BaseServiceNodeImpl targetNode = new BaseServiceNodeImpl(capability.getValue().getNodeName()); + + if (!weightedGraph.containsVertex(sourceNode)) { + weightedGraph.addVertex(sourceNode); + } + if (!weightedGraph.containsVertex(targetNode)) { + weightedGraph.addVertex(targetNode); + } + + FlowEdge flowEdge = + new LoadBasedFlowEdgeImpl(sourceNode, targetNode, defaultFlowEdgeProps, topologySpec.getSpecExecutor()); + + // In Multi-Graph if flowEdge existed, just skip it. + if (!weightedGraph.containsEdge(flowEdge)) { + weightedGraph.addEdge(sourceNode, targetNode, flowEdge); + } + } + } catch (InterruptedException | ExecutionException e) { + Instrumented.markMeter(this.flowCompilationFailedMeter); + throw new RuntimeException("Cannot determine topology capabilities", e); + } + } + + /** + * Generate JobSpec based on the #templateURI that user specified. + */ + private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowEdge flowEdge, URI templateURI, + FlowSpec flowSpec) { + JobSpec jobSpec; + JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowSpec, sourceNode, targetNode)) + .withConfig(flowSpec.getConfig()) + .withDescription(flowSpec.getDescription()) + .withVersion(flowSpec.getVersion()); + if (edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity()) && edgeTemplateMap.get(flowEdge.getEdgeIdentity()) + .contains(templateURI)) { + jobSpecBuilder.withTemplate(templateURI); + try { + jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get()); + log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties()); + } catch (SpecNotFoundException | JobTemplate.TemplateException e) { + throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e); + } + } else { + jobSpec = jobSpecBuilder.build(); + log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties()); + } + return jobSpec; + } + + /** + * A naive implementation of resolving templates in each JobSpec among Multi-hop FlowSpec. + * Handle the case when edge is not specified. + * Always select the first available template. + */ + private JobSpec jobSpecGenerator(ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec) { + FlowEdge flowEdge = weightedGraph.getAllEdges(sourceNode, targetNode).iterator().next(); + URI firstTemplateURI = + (edgeTemplateMap != null && edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get( + flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getUri(); + return this.jobSpecGenerator(sourceNode, targetNode, flowEdge, firstTemplateURI, flowSpec); + } + + /** + * A naive implementation of generating a jobSpec's URI within a multi-hop logical Flow. + */ + public static URI jobSpecURIGenerator(FlowSpec flowSpec, ServiceNode sourceNode, ServiceNode targetNode) { + try { + return new URI(flowSpec.getUri().getScheme(), flowSpec.getUri().getAuthority(), + "/" + sourceNode.getNodeName() + "-" + targetNode.getNodeName(), null); + } catch (URISyntaxException e) { + log.error( + "URI construction failed when jobSpec from " + sourceNode.getNodeName() + " to " + targetNode.getNodeName()); + throw new RuntimeException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index 65deeec..261ce6e 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -22,12 +22,8 @@ import java.net.URI; import java.util.Collections; import java.util.List; import java.util.Map; - import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; -import lombok.Getter; -import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.slf4j.Logger; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; @@ -40,10 +36,8 @@ import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.Instrumentable; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; - import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.SpecCompiler; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecCatalogListener; @@ -54,6 +48,13 @@ import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.slf4j.LoggerFactory; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.gobblin.configuration.State; +import org.slf4j.Logger; + +import lombok.Getter; /** @@ -179,7 +180,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { public void orchestrate(Spec spec) throws Exception { long startTime = System.nanoTime(); if (spec instanceof FlowSpec) { - Map<Spec, SpecExecutorInstanceProducer> specExecutorInstanceMap = specCompiler.compileFlow(spec); + Map<Spec, SpecExecutor> specExecutorInstanceMap = specCompiler.compileFlow(spec); if (specExecutorInstanceMap.isEmpty()) { _log.warn("Cannot determine an executor to run on for Spec: " + spec); @@ -187,18 +188,18 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { } // Schedule all compiled JobSpecs on their respective Executor - for (Map.Entry<Spec, SpecExecutorInstanceProducer> specsToExecute : specExecutorInstanceMap.entrySet()) { + for (Map.Entry<Spec, SpecExecutor> specsToExecute : specExecutorInstanceMap.entrySet()) { // Run this spec on selected executor - SpecExecutorInstanceProducer producer = null; + SpecProducer producer = null; try { - producer = specsToExecute.getValue(); + producer = specsToExecute.getValue().getProducer().get(); Spec jobSpec = specsToExecute.getKey(); _log.info(String.format("Going to orchestrate JobSpc: %s on Executor: %s", jobSpec, producer)); producer.addSpec(jobSpec); } catch(Exception e) { _log.error("Cannot successfully setup spec: " + specsToExecute.getKey() + " on executor: " + producer + - " for flow: " + spec, e); + " for flow: " + spec, e); } } } else { @@ -221,7 +222,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { } @Override - public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { + public List<Tag<?>> generateTags(State state) { return Collections.emptyList(); } @@ -234,4 +235,4 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { public void switchMetricContext(MetricContext context) { throw new UnsupportedOperationException(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java new file mode 100644 index 0000000..540b13d --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.policy; + +import java.util.Set; +import org.apache.gobblin.runtime.api.FlowEdge; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.jgrapht.graph.DirectedWeightedMultigraph; + + +/** + * ServicePolicy will be firstly checked before the compilation happen. + * unexpcted edges will not be considered in compilation process. + */ +public interface ServicePolicy { + + /** + * After initialization of {@link ServicePolicy}, the populating method need to invoked before + * {@link #getBlacklistedEdges()} can return the expected result. + * + * This requirement exists because when {@link ServicePolicy} is initialized it is not necessary that a + * {@link org.jgrapht.graph.WeightedMultigraph} has been constructed, neither we cannot know if user-specified edges + * and nodes exist in {@link org.jgrapht.graph.WeightedMultigraph}. + * The population of blacklisted Edges make sense after graph has been constructed. + */ + public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode, FlowEdge> graph); + + /** + * Should return all edges that being blacklisted by this policy. + */ + public Set<FlowEdge> getBlacklistedEdges(); + + public void addServiceNode(ServiceNode serviceNode); + + public void addFlowEdge(FlowEdge flowEdge); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java new file mode 100644 index 0000000..aabc67e --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.policy; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Preconditions; + +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.runtime.api.FlowEdge; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.jgrapht.graph.DirectedWeightedMultigraph; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * Defines {@link ServiceNode}s or {@link FlowEdge}s that should be blacklisted from the Flow-compiler process, + * obtained only from configuration, which is the reason it is named as static. + * + * TODO: DynamicServicePolicy can obtain new blacklist candidate through Flow monitoring process, which is responsible + * for monitoring the flow execution and react accordingly. + * + * Either user specify {@link ServiceNode} or {@link FlowEdge} to blacklist will all end up with a list of + * {@link FlowEdge}s that won't be considered when selecting path for data transformation. + */ +@Slf4j +@Alias("static") +public class StaticServicePolicy implements ServicePolicy { + + @Getter + Set<FlowEdge> blacklistedEdges; + + List<ServiceNode> serviceNodes; + List<FlowEdge> flowEdges; + + public StaticServicePolicy() { + serviceNodes = new ArrayList<>(); + flowEdges = new ArrayList<>(); + blacklistedEdges = new HashSet<>(); + } + + public StaticServicePolicy(List<ServiceNode> serviceNodes, List<FlowEdge> flowEdges) { + Preconditions.checkNotNull(serviceNodes); + Preconditions.checkNotNull(flowEdges); + blacklistedEdges = new HashSet<>(); + this.serviceNodes = serviceNodes; + this.flowEdges = flowEdges; + } + + public void addServiceNode(ServiceNode serviceNode) { + this.serviceNodes.add(serviceNode); + } + + public void addFlowEdge(FlowEdge flowEdge){ + this.flowEdges.add(flowEdge); + } + + @Override + public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode, FlowEdge> graph) { + for (ServiceNode node: serviceNodes) { + if (graph.containsVertex(node)) { + blacklistedEdges.addAll(graph.incomingEdgesOf(node)); + blacklistedEdges.addAll(graph.outgoingEdgesOf(node)); + } else { + log.info("The graph " + graph + " doesn't contains node " + node.toString()); + } + } + + for( FlowEdge flowEdge: flowEdges) { + if (graph.containsEdge(flowEdge)) { + blacklistedEdges.add(flowEdge); + } else { + log.info("The graph " + graph + "doesn't contains edge " + flowEdge.toString()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index d223f90..a625f36 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -53,7 +53,7 @@ import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.scheduler.BaseGobblinJob; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; -import org.apache.gobblin.service.HelixUtils; +import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.util.ConfigUtils; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java index 1a9b8f6..fb7c1b0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java @@ -32,11 +32,12 @@ import com.google.common.collect.Lists; import com.typesafe.config.Config; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.runtime.api.SpecExecutor; + @Alpha @@ -45,7 +46,7 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory { private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); private final Config _config; private final Logger _log; - private final ClassAliasResolver<SpecExecutorInstanceProducer> _aliasResolver; + private final ClassAliasResolver<SpecExecutor> _aliasResolver; public ConfigBasedTopologySpecFactory(Config config) { this(config, Optional.<Logger>absent()); @@ -55,7 +56,7 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory { Preconditions.checkNotNull(config, "Config should not be null"); _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); _config = config; - _aliasResolver = new ClassAliasResolver<>(SpecExecutorInstanceProducer.class); + _aliasResolver = new ClassAliasResolver<>(SpecExecutor.class); } @Override @@ -70,21 +71,21 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory { for (String topologyName : topologyNames) { Preconditions.checkArgument(_config.hasPath(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + topologyName), - "Config does not contain Topology Factory descriptor for Topology" + topologyName); + "Config does not contain Topology Factory descriptor for Topology " + topologyName); Config topologyConfig = _config.getConfig(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + topologyName); String description = ConfigUtils.getString(topologyConfig, ServiceConfigKeys.TOPOLOGYSPEC_DESCRIPTION_KEY, "NA"); String version = ConfigUtils.getString(topologyConfig, ServiceConfigKeys.TOPOLOGYSPEC_VERSION_KEY, "-1"); - String specExecutorInstanceProducerClass = ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER; - if (topologyConfig.hasPath(ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY)) { - specExecutorInstanceProducerClass = topologyConfig.getString(ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY); + String specExecutorClass = ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR; + if (topologyConfig.hasPath(ServiceConfigKeys.SPEC_EXECUTOR_KEY)) { + specExecutorClass = topologyConfig.getString(ServiceConfigKeys.SPEC_EXECUTOR_KEY); } - SpecExecutorInstanceProducer specExecutorInstanceProducer; + SpecExecutor specExecutor; try { - _log.info("Using SpecExecutorInstanceProducer class name/alias " + specExecutorInstanceProducerClass); - specExecutorInstanceProducer = (SpecExecutorInstanceProducer) ConstructorUtils + _log.info("Using SpecProducer class name/alias " + specExecutorClass); + specExecutor = (SpecExecutor) ConstructorUtils .invokeConstructor(Class.forName(_aliasResolver - .resolve(specExecutorInstanceProducerClass)), topologyConfig); + .resolve(specExecutorClass)), topologyConfig); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) { throw new RuntimeException(e); @@ -95,10 +96,10 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory { .withConfig(topologyConfig) .withDescription(description) .withVersion(version) - .withSpecExecutorInstanceProducer(specExecutorInstanceProducer); + .withSpecExecutor(specExecutor); topologySpecs.add(topologySpecBuilder.build()); } return topologySpecs; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java new file mode 100644 index 0000000..946fe10 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.utils; +import org.apache.gobblin.runtime.api.ServiceNode; + +/** + * This is a helping class(Basically a wrapper) for Shortest path finding process + * to keep the shortest distance from source to an arbitrary node. + */ +public class DistancedNode<T extends ServiceNode> { + + /** + * The distance between {@link this} node to the src node in the shortest-distance finding problem. + */ + private double distToSrc; + + private T _serviceNode; + + + /** + * Max_Value represents no-connection. + */ + public DistancedNode(T _serviceNode){ + this(_serviceNode, Double.MAX_VALUE); + } + + public DistancedNode(T _serviceNode, double dist){ + this._serviceNode = _serviceNode; + this.distToSrc = dist; + } + + public double getDistToSrc(){ + return this.distToSrc; + } + + public void setDistToSrc(double distToSrc){ + this.distToSrc = distToSrc; + } + + public T getNode(){ + return this._serviceNode; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DistancedNode<?> that = (DistancedNode<?>) o; + + return _serviceNode.equals(that._serviceNode); + } + + @Override + public int hashCode() { + return _serviceNode.hashCode(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java new file mode 100644 index 0000000..1481f78 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.utils; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; + +import org.apache.gobblin.runtime.api.FlowEdge; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl; +import org.jgrapht.graph.DirectedWeightedMultigraph; + +import lombok.extern.slf4j.Slf4j; + +import avro.shaded.com.google.common.annotations.VisibleForTesting; + +@Slf4j +public class FindPathUtils { + // Since Author{[email protected]} couldn't find the proper way to conduct Library provided by JGraphT + // on the customized-edge Graph, here is the raw implementation of Dijkstra algorithm for finding shortest path. + + /** + * Given sourceNode and targetNode, find the shortest path and return shortest path. + * @return Each edge on this shortest path, in order. + * + */ + @VisibleForTesting + public static List<FlowEdge> dijkstraBasedPathFindingHelper(ServiceNode sourceNode, ServiceNode targetNode, + DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph) { + Map<DistancedNode, ArrayList<FlowEdge>> shortestPath = new HashMap<>(); + Map<DistancedNode, Double> shortestDist = new HashMap<>(); + PriorityQueue<DistancedNode> pq = new PriorityQueue<>(new Comparator<DistancedNode>() { + @Override + public int compare(DistancedNode o1, DistancedNode o2) { + if (o1.getDistToSrc() < o2.getDistToSrc()) { + return -1; + } else { + return 1; + } + } + }); + pq.add(new DistancedNode(sourceNode, 0.0)); + + Set<FlowEdge> visitedEdge = new HashSet<>(); + + while(!pq.isEmpty()) { + DistancedNode node = pq.poll(); + if (node.getNode().getNodeName().equals(targetNode.getNodeName())) { + // Searching finished + return shortestPath.get(node); + } + + Set<FlowEdge> outgoingEdges = weightedGraph.outgoingEdgesOf(node.getNode()); + for (FlowEdge outGoingEdge:outgoingEdges) { + // Since it is a multi-graph problem, should use edge for deduplicaiton instead of vertex. + if (visitedEdge.contains(outGoingEdge)) { + continue; + } + + DistancedNode adjacentNode = new DistancedNode(weightedGraph.getEdgeTarget(outGoingEdge)); + if (shortestDist.containsKey(adjacentNode)) { + adjacentNode.setDistToSrc(shortestDist.get(adjacentNode)); + } + + double newDist = node.getDistToSrc() + ((LoadBasedFlowEdgeImpl) outGoingEdge).getEdgeLoad(); + + if (newDist < adjacentNode.getDistToSrc()) { + if (pq.contains(adjacentNode)) { + pq.remove(adjacentNode); + } + + // Update the shortest path. + ArrayList<FlowEdge> path = shortestPath.containsKey(node) + ? new ArrayList<>(shortestPath.get(node)) : new ArrayList<>(); + path.add(outGoingEdge); + shortestPath.put(adjacentNode, path); + shortestDist.put(adjacentNode, newDist); + + adjacentNode.setDistToSrc(newDist); + pq.add(adjacentNode); + } + visitedEdge.add(outGoingEdge); + } + } + log.error("No path found"); + return new ArrayList<>(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java new file mode 100644 index 0000000..f2c1c84 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/HelixUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.utils; + +import com.google.common.annotations.VisibleForTesting; +import java.util.UUID; +import org.apache.helix.Criteria; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.model.Message; +import org.apache.helix.tools.ClusterSetup; + +import org.apache.gobblin.annotation.Alpha; +import org.slf4j.Logger; + + +@Alpha +public class HelixUtils { + + /*** + * Build a Helix Manager (Helix Controller instance). + * + * @param helixInstanceName the Helix Instance name. + * @param helixClusterName the Helix Cluster name. + * @param zkConnectionString the ZooKeeper connection string. + * @return HelixManager + */ + public static HelixManager buildHelixManager(String helixInstanceName, String helixClusterName, String zkConnectionString) { + return HelixManagerFactory.getZKHelixManager(helixClusterName, helixInstanceName, + InstanceType.CONTROLLER, zkConnectionString); + } + + /** + * Create a Helix cluster for the Gobblin Cluster application. + * + * @param zkConnectionString the ZooKeeper connection string + * @param clusterName the Helix cluster name + */ + public static void createGobblinHelixCluster(String zkConnectionString, String clusterName) { + createGobblinHelixCluster(zkConnectionString, clusterName, true); + } + + /** + * Create a Helix cluster for the Gobblin Cluster application. + * + * @param zkConnectionString the ZooKeeper connection string + * @param clusterName the Helix cluster name + * @param overwrite true to overwrite exiting cluster, false to reuse existing cluster + */ + public static void createGobblinHelixCluster(String zkConnectionString, String clusterName, boolean overwrite) { + ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString); + // Create the cluster and overwrite if it already exists + clusterSetup.addCluster(clusterName, overwrite); + // Helix 0.6.x requires a configuration property to have the form key=value. + String autoJoinConfig = ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN + "=true"; + clusterSetup.setConfig(HelixConfigScope.ConfigScopeProperty.CLUSTER, clusterName, autoJoinConfig); + } + + /** + * Get a Helix instance name. + * + * @param namePrefix a prefix of Helix instance names + * @param instanceId an integer instance ID + * @return a Helix instance name that is a concatenation of the given prefix and instance ID + */ + public static String getHelixInstanceName(String namePrefix, int instanceId) { + return namePrefix + "_" + instanceId; + } + + @VisibleForTesting + public static void sendUserDefinedMessage(String messageSubType, String messageVal, String messageId, + InstanceType instanceType, HelixManager helixManager, Logger logger) { + Criteria criteria = new Criteria(); + criteria.setInstanceName("%"); + criteria.setResource("%"); + criteria.setPartition("%"); + criteria.setPartitionState("%"); + criteria.setRecipientInstanceType(instanceType); + criteria.setSessionSpecific(true); + + Message message = new Message(Message.MessageType.USER_DEFINE_MSG.toString(), messageId); + message.setMsgSubType(messageSubType); + message.setAttribute(Message.Attributes.INNER_MESSAGE, messageVal); + message.setMsgState(Message.MessageState.NEW); + message.setTgtSessionId("*"); + + int messagesSent = helixManager.getMessagingService().send(criteria, message); + if (messagesSent == 0) { + logger.error(String.format("Failed to send the %s message to the participants", message)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java index 01d0285..289e212 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java @@ -49,8 +49,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; -import org.apache.gobblin.service.FlowId; -import org.apache.gobblin.service.HelixUtils; +import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.util.ConfigUtils; @@ -139,8 +138,8 @@ public class GobblinServiceHATest { "1"); commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".uri", "gobblinExecutor"); - commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstanceProducer", - "org.apache.gobblin.service.InMemorySpecExecutorInstanceProducer"); + commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance", + "org.gobblin.service.InMemorySpecExecutor"); commonServiceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME); @@ -506,4 +505,4 @@ public class GobblinServiceHATest { Assert.assertTrue(assertSuccess, "New master should take over all old master jobs."); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java index 314dc66..b40792e 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java @@ -103,8 +103,8 @@ public class GobblinServiceManagerTest { "1"); serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".uri", "gobblinExecutor"); - serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstanceProducer", - "org.apache.gobblin.service.InMemorySpecExecutorInstanceProducer"); + serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance", + "org.apache.gobblin.service.InMemorySpecExecutor"); serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME); @@ -339,4 +339,4 @@ public class GobblinServiceManagerTest { Assert.fail("Get should have raised a 404 error"); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index 0b3dc15..864b238 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -16,6 +16,7 @@ */ package org.apache.gobblin.service.modules.core; + import java.io.File; import java.net.URI; import java.net.URISyntaxException; @@ -39,14 +40,14 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; import org.apache.gobblin.runtime.api.TopologySpec; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; - +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; public class IdentityFlowToJobSpecCompilerTest { private static final Logger logger = LoggerFactory.getLogger(IdentityFlowToJobSpecCompilerTest.class); @@ -114,14 +115,14 @@ public class IdentityFlowToJobSpecCompilerTest { properties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR); properties.put("specExecInstance.capabilities", TEST_SOURCE_NAME + ":" + TEST_SINK_NAME); Config config = ConfigUtils.propertiesToConfig(properties); - SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config); + SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config); TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, TOPOLOGY_SPEC_STORE_DIR)) .withConfig(config) .withDescription(SPEC_DESCRIPTION) .withVersion(SPEC_VERSION) - .withSpecExecutorInstanceProducer(specExecutorInstanceProducer); + .withSpecExecutor(specExecutorInstance); return topologySpecBuilder.build(); } @@ -152,7 +153,7 @@ public class IdentityFlowToJobSpecCompilerTest { return flowSpecBuilder.build(); } - public URI computeTopologySpecURI(String parent, String current) { + public static URI computeTopologySpecURI(String parent, String current) { // Make sure this is relative return PathUtils.relativizePath(new Path(current), new Path(parent)).toUri(); } @@ -186,7 +187,7 @@ public class IdentityFlowToJobSpecCompilerTest { FlowSpec flowSpec = initFlowSpec(); // Run compiler on flowSpec - Map<Spec, SpecExecutorInstanceProducer> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec); + Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec); // Assert pre-requisites Assert.assertNotNull(specExecutorMapping, "Expected non null mapping."); @@ -215,7 +216,7 @@ public class IdentityFlowToJobSpecCompilerTest { FlowSpec flowSpec = initFlowSpec(); // Run compiler on flowSpec - Map<Spec, SpecExecutorInstanceProducer> specExecutorMapping = this.compilerWithoutTemplateCalague.compileFlow(flowSpec); + Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithoutTemplateCalague.compileFlow(flowSpec); // Assert pre-requisites Assert.assertNotNull(specExecutorMapping, "Expected non null mapping."); @@ -244,10 +245,10 @@ public class IdentityFlowToJobSpecCompilerTest { FlowSpec flowSpec = initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, "unsupportedSource", "unsupportedSink"); // Run compiler on flowSpec - Map<Spec, SpecExecutorInstanceProducer> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec); + Map<Spec, SpecExecutor> specExecutorMapping = this.compilerWithTemplateCalague.compileFlow(flowSpec); // Assert pre-requisites Assert.assertNotNull(specExecutorMapping, "Expected non null mapping."); Assert.assertTrue(specExecutorMapping.size() == 0, "Exepected 1 executor for FlowSpec."); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java new file mode 100644 index 0000000..cc722eb --- /dev/null +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.core; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import com.typesafe.config.Config; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.Path; +import org.jgrapht.graph.DirectedWeightedMultigraph; +import org.jgrapht.graph.WeightedMultigraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl; +import org.apache.gobblin.runtime.api.FlowEdge; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl; +import org.apache.gobblin.service.modules.flow.MultiHopsFlowToJobSpecCompiler; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PathUtils; + +import static org.apache.gobblin.service.modules.utils.FindPathUtils.*; + + +// All unit tests here will be with templateCatelogue. +public class MultiHopsFlowToJobSpecCompilerTest { + private static final Logger logger = LoggerFactory.getLogger(MultiHopsFlowToJobSpecCompilerTest.class); + + private static final String TEST_TEMPLATE_CATALOG_PATH = "/tmp/gobblinTestTemplateCatalog_" + System.currentTimeMillis(); + private static final String TEST_TEMPLATE_CATALOG_URI = "file://" + TEST_TEMPLATE_CATALOG_PATH; + + private static final String TEST_TEMPLATE_NAME = "test.template"; + private static final String TEST_TEMPLATE_URI = "FS:///test.template"; + + // The path to be discovered is TEST_SOURCE_NAME -> TEST_HOP_NAME_A -> TEST_HOP_NAME_B -> TEST_SINK_NAME + private static final String TEST_SOURCE_NAME = "testSource"; + private static final String TEST_HOP_NAME_A = "testHopA"; + private static final String TEST_HOP_NAME_B = "testHopB"; + private static final String TEST_HOP_NAME_C = "testHopC"; + private static final String TEST_SINK_NAME = "testSink"; + private static final String TEST_FLOW_GROUP = "testFlowGroup"; + private static final String TEST_FLOW_NAME = "testFlowName"; + + private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/"; + private static final String SPEC_DESCRIPTION = "Test Orchestrator"; + private static final String SPEC_VERSION = "1"; + private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis(); + private static final String TOPOLOGY_SPEC_STORE_DIR_SECOND = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis() + "_2"; + private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis(); + + private ServiceNode vertexSource; + private ServiceNode vertexHopA; + private ServiceNode vertexHopB; + private ServiceNode vertexHopC; + private ServiceNode vertexSink; + + private MultiHopsFlowToJobSpecCompiler compilerWithTemplateCalague; + private Map<String, List<URI>> edgeTemplateMap; + + + @BeforeClass + public void setUp() throws Exception{ + // Create dir for template catalog + FileUtils.forceMkdir(new File(TEST_TEMPLATE_CATALOG_PATH)); + + // Create template to use in test + List<String> templateEntries = new ArrayList<>(); + templateEntries.add("testProperty1 = \"testValue1\""); + templateEntries.add("testProperty2 = \"test.Value1\""); + templateEntries.add("testProperty3 = 100"); + FileUtils.writeLines(new File(TEST_TEMPLATE_CATALOG_PATH + "/" + TEST_TEMPLATE_NAME), templateEntries); + + // Initialize complier with template catalog + Properties compilerWithTemplateCatalogProperties = new Properties(); + compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI); + + // Initialize compiler with common useful properties + String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," + TEST_HOP_NAME_B + "," + TEST_SINK_NAME; + compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH, testPath); + + this.compilerWithTemplateCalague = new MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties)); + + vertexSource = new BaseServiceNodeImpl(TEST_SOURCE_NAME); + vertexHopA = new BaseServiceNodeImpl(TEST_HOP_NAME_A); + vertexHopB = new BaseServiceNodeImpl(TEST_HOP_NAME_B); + vertexHopC = new BaseServiceNodeImpl(TEST_HOP_NAME_C); + vertexSink = new BaseServiceNodeImpl(TEST_SINK_NAME); + + } + + @AfterClass + public void cleanUp() throws Exception { + // Cleanup Template Catalog + try { + cleanUpDir(TEST_TEMPLATE_CATALOG_PATH); + } catch (Exception e) { + logger.warn("Could not completely cleanup Template catalog dir"); + } + + // Cleanup ToplogySpec Dir + try { + cleanUpDir(TOPOLOGY_SPEC_STORE_DIR); + } catch (Exception e) { + logger.warn("Could not completely cleanup ToplogySpec catalog dir"); + } + + // Cleanup FlowSpec Dir + try { + cleanUpDir(FLOW_SPEC_STORE_DIR); + } catch (Exception e) { + logger.warn("Could not completely cleanup FlowSpec catalog dir"); + } + } + + @Test + public void testWeightedGraphConstruction(){ + FlowSpec flowSpec = initFlowSpec(); + TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME); + this.compilerWithTemplateCalague.onAddSpec(topologySpec); + + // invocation of compileFlow trigger the weighedGraph construction + this.compilerWithTemplateCalague.compileFlow(flowSpec); + DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph = compilerWithTemplateCalague.getWeightedGraph(); + + Assert.assertTrue(weightedGraph.containsVertex(vertexSource)); + Assert.assertTrue(weightedGraph.containsVertex(vertexHopA)); + Assert.assertTrue(weightedGraph.containsVertex(vertexHopB)); + Assert.assertTrue(weightedGraph.containsVertex(vertexSink)); + + FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA, topologySpec.getSpecExecutor()); + FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB, topologySpec.getSpecExecutor()); + FlowEdge edgeB2Sink = new LoadBasedFlowEdgeImpl(vertexHopB, vertexSink, topologySpec.getSpecExecutor()); + + Assert.assertTrue(weightedGraph.containsEdge(edgeSrc2A)); + Assert.assertTrue(weightedGraph.containsEdge(edgeA2B)); + Assert.assertTrue(weightedGraph.containsEdge(edgeB2Sink)); + + Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexSource, vertexHopA), edgeSrc2A)); + Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopA, vertexHopB), edgeA2B)); + Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopB, vertexSink), edgeB2Sink)); + + this.compilerWithTemplateCalague.onDeleteSpec(topologySpec.getUri(), ""); + } + + @Test + public void testUserSpecifiedPathCompilation(){ + // TODO + } + + @Test + public void testServicePolicy(){ + // Initialize compiler with some blacklist properties + Properties properties = new Properties(); + properties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI); + String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," + TEST_HOP_NAME_B + "," + TEST_SINK_NAME; + properties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH, testPath); + properties.setProperty(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES, + "testHopA"); + MultiHopsFlowToJobSpecCompiler compiler = new MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(properties)); + + + FlowSpec flowSpec = initFlowSpec(); + TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME); + compiler.onAddSpec(topologySpec); + + // invocation of compileFlow trigger the weighedGraph construction + compiler.compileFlow(flowSpec); + + compiler.servicePolicy.populateBlackListedEdges(compiler.getWeightedGraph()); + Assert.assertEquals(compiler.servicePolicy.getBlacklistedEdges().size(), 2); + + FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA, topologySpec.getSpecExecutor()); + FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB, topologySpec.getSpecExecutor()); + + Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeSrc2A)); + Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeA2B)); + + } + + @Test (dependsOnMethods = "testWeightedGraphConstruction") + public void testDijkstraPathFinding(){ + + FlowSpec flowSpec = initFlowSpec(); + TopologySpec topologySpec_1 = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME); + TopologySpec topologySpec_2 = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR_SECOND, TEST_SOURCE_NAME, TEST_HOP_NAME_B, TEST_HOP_NAME_C, TEST_SINK_NAME); + this.compilerWithTemplateCalague.onAddSpec(topologySpec_1); + this.compilerWithTemplateCalague.onAddSpec(topologySpec_2); + + // Get the edge -> Change the weight -> Materialized the edge change back to graph -> compile again -> Assertion + this.compilerWithTemplateCalague.compileFlow(flowSpec); + DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph = compilerWithTemplateCalague.getWeightedGraph(); + FlowEdge a2b= weightedGraph.getEdge(vertexHopA, vertexHopB); + FlowEdge b2c = weightedGraph.getEdge(vertexHopB, vertexHopC); + FlowEdge c2s = weightedGraph.getEdge(vertexHopC, vertexSink); + weightedGraph.setEdgeWeight(a2b, 1.99); + weightedGraph.setEdgeWeight(b2c, 0.1); + weightedGraph.setEdgeWeight(c2s, 0.2); + + // Best route: Src - B(1) - C(0.1) - sink (0.2) + this.compilerWithTemplateCalague.compileFlow(flowSpec); + List<FlowEdge> edgeList = dijkstraBasedPathFindingHelper(vertexSource, vertexSink, weightedGraph); + + FlowEdge src2b = weightedGraph.getEdge(vertexSource, vertexHopB); + FlowEdge b2C = weightedGraph.getEdge(vertexHopB, vertexHopC); + FlowEdge c2sink = weightedGraph.getEdge(vertexHopC, vertexSink); + Assert.assertEquals(edgeList.get(0).getEdgeIdentity(), src2b.getEdgeIdentity()); + Assert.assertEquals(edgeList.get(1).getEdgeIdentity(), b2C.getEdgeIdentity()); + Assert.assertEquals(edgeList.get(2).getEdgeIdentity(), c2sink.getEdgeIdentity()); + + this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_1.getUri(), ""); + this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_2.getUri(), ""); + } + + // The topology is: Src - A - B - Dest + private TopologySpec initTopologySpec(String storeDir, String ... args) { + Properties properties = new Properties(); + properties.put("specStore.fs.dir", storeDir); + String capabilitiesString = ""; + for(int i =0 ; i < args.length - 1 ; i ++ ) { + capabilitiesString = capabilitiesString + ( args[i] + ":" + args[i+1] + ","); + } + Assert.assertEquals(capabilitiesString.charAt(capabilitiesString.length() - 1) , ','); + capabilitiesString = capabilitiesString.substring(0, capabilitiesString.length() - 1 ); + properties.put("specExecInstance.capabilities", capabilitiesString); + properties.put("executorAttrs", new Properties()); + Config config = ConfigUtils.propertiesToConfig(properties); + SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config); + + TopologySpec.Builder topologySpecBuilder = TopologySpec.builder( + IdentityFlowToJobSpecCompilerTest.computeTopologySpecURI(SPEC_STORE_PARENT_DIR, + storeDir)) + .withConfig(config) + .withDescription(SPEC_DESCRIPTION) + .withVersion(SPEC_VERSION) + .withSpecExecutor(specExecutorInstance); + return topologySpecBuilder.build(); + } + + private FlowSpec initFlowSpec() { + return initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, TEST_SOURCE_NAME, TEST_SINK_NAME); + } + + private FlowSpec initFlowSpec(String flowGroup, String flowName, String source, String destination) { + Properties properties = new Properties(); + properties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *"); + properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup); + properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName); + properties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source); + properties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, destination); + Config config = ConfigUtils.propertiesToConfig(properties); + + FlowSpec.Builder flowSpecBuilder = null; + try { + flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, + FLOW_SPEC_STORE_DIR)) + .withConfig(config) + .withDescription("dummy description") + .withVersion("1") + .withTemplate(new URI(TEST_TEMPLATE_URI)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + return flowSpecBuilder.build(); + } + + private void cleanUpDir(String dir) throws Exception { + File specStoreDir = new File(dir); + if (specStoreDir.exists()) { + FileUtils.deleteDirectory(specStoreDir); + } + } + + // Criteria for FlowEdge to be equal in testing context + private boolean edgeEqual(FlowEdge a, FlowEdge b){ + return (a.getEdgeIdentity().equals(b.getEdgeIdentity()) && + ((LoadBasedFlowEdgeImpl)a).getEdgeLoad() == ((LoadBasedFlowEdgeImpl)b).getEdgeLoad()); + } + + // Use this function for + private void populateTemplateMap(WeightedMultigraph<ServiceNode, FlowEdge> weightedGraph, URI exempliedURI){ + this.edgeTemplateMap.clear(); + Set<FlowEdge> allEdges = weightedGraph.edgeSet(); + for ( FlowEdge edge : allEdges ) { + this.edgeTemplateMap.put(edge.getEdgeIdentity(), Arrays.asList(exempliedURI)) ; + } + } + + public static URI computeTopologySpecURI(String parent, String current) { + // Make sure this is relative + return PathUtils.relativizePath(new Path(current), new Path(parent)).toUri(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index eb9974a..a933e85 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -17,6 +17,8 @@ package org.apache.gobblin.service.modules.orchestration; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import java.io.File; import java.net.URI; import java.net.URISyntaxException; @@ -40,13 +42,10 @@ import com.typesafe.config.Config; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecCompiler; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer; import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; @@ -120,14 +119,14 @@ public class OrchestratorTest { properties.put("specExecInstance.capabilities", "source:destination"); Config config = ConfigUtils.propertiesToConfig(properties); - SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config); + SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config); TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, TOPOLOGY_SPEC_STORE_DIR)) .withConfig(config) .withDescription(SPEC_DESCRIPTION) .withVersion(SPEC_VERSION) - .withSpecExecutorInstanceProducer(specExecutorInstanceProducer); + .withSpecExecutor(specExecutorInstance); return topologySpecBuilder.build(); } @@ -139,8 +138,6 @@ public class OrchestratorTest { properties.put("gobblin.flow.destinationIdentifier", "destination"); Config config = ConfigUtils.propertiesToConfig(properties); - SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config); - FlowSpec.Builder flowSpecBuilder = null; try { flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, @@ -209,10 +206,10 @@ public class OrchestratorTest { @Test (dependsOnMethods = "createTopologySpec") public void createFlowSpec() throws Exception { - // Since only 1 Topology with 1 SpecExecutorInstanceProducer has been added in previous test + // Since only 1 Topology with 1 SpecProducer has been added in previous test // .. it should be available and responsible for our new FlowSpec IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); - SpecExecutorInstanceProducer sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutorInstanceProducer(); + SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor(); // List Current Specs Collection<Spec> specs = flowCatalog.getSpecs(); @@ -225,7 +222,7 @@ public class OrchestratorTest { // Make sure FlowCatalog is empty Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition"); // Make sure FlowCatalog Listener is empty - Assert.assertTrue(((List)(sei.listSpecs().get())).size() == 0, "SpecExecutorInstanceProducer should not know about " + Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 0, "SpecProducer should not know about " + "any Flow before addition"); // Create and add Spec @@ -243,7 +240,7 @@ public class OrchestratorTest { // Make sure FlowCatalog has the added Flow Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition"); // Orchestrator is a no-op listener for any new FlowSpecs - Assert.assertTrue(((List)(sei.listSpecs().get())).size() == 0, "SpecExecutorInstanceProducer should contain 0 " + Assert.assertTrue(((List)(sei.getProducer().get().listSpecs().get())).size() == 0, "SpecProducer should contain 0 " + "Spec after addition"); } @@ -251,7 +248,7 @@ public class OrchestratorTest { public void deleteFlowSpec() throws Exception { // Since only 1 Flow has been added in previous test it should be available IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler(); - SpecExecutorInstanceProducer sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutorInstanceProducer(); + SpecExecutor sei = specCompiler.getTopologySpecMap().values().iterator().next().getSpecExecutor(); // List Current Specs Collection<Spec> specs = flowCatalog.getSpecs(); @@ -264,8 +261,8 @@ public class OrchestratorTest { // Make sure FlowCatalog has the previously added Flow Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Flow that was added in last test"); // Orchestrator is a no-op listener for any new FlowSpecs, so no FlowSpecs should be around - int specsInSEI = ((List)(sei.listSpecs().get())).size(); - Assert.assertTrue(specsInSEI == 0, "SpecExecutorInstanceProducer should contain 0 " + int specsInSEI = ((List)(sei.getProducer().get().listSpecs().get())).size(); + Assert.assertTrue(specsInSEI == 0, "SpecProducer should contain 0 " + "Spec after addition because Orchestrator is a no-op listener for any new FlowSpecs"); // Remove the flow @@ -283,8 +280,8 @@ public class OrchestratorTest { // Make sure FlowCatalog has the Flow removed Assert.assertTrue(specs.size() == 0, "Spec store should not contain Spec after deletion"); // Make sure FlowCatalog Listener knows about the deletion - specsInSEI = ((List)(sei.listSpecs().get())).size(); - Assert.assertTrue(specsInSEI == 0, "SpecExecutorInstanceProducer should not contain " + specsInSEI = ((List)(sei.getProducer().get().listSpecs().get())).size(); + Assert.assertTrue(specsInSEI == 0, "SpecProducer should not contain " + "Spec after deletion"); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java index c28e97b..36193dc 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactoryTest.java @@ -54,8 +54,8 @@ public class ConfigBasedTopologySpecFactoryTest { properties.put(topology1Prefix + ServiceConfigKeys.TOPOLOGYSPEC_DESCRIPTION_KEY, "Topology for cluster"); properties.put(topology1Prefix + ServiceConfigKeys.TOPOLOGYSPEC_VERSION_KEY, "1"); properties.put(topology1Prefix + ServiceConfigKeys.TOPOLOGYSPEC_URI_KEY, "/mySpecs/" + topology1); - properties.put(topology1Prefix + ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY, - ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER); + properties.put(topology1Prefix + ServiceConfigKeys.SPEC_EXECUTOR_KEY, + ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR); properties.put(topology1Prefix + ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY, "salesforce:nosql"); // Topology Azkaban1 properties @@ -63,8 +63,8 @@ public class ConfigBasedTopologySpecFactoryTest { properties.put(topology2Prefix + ServiceConfigKeys.TOPOLOGYSPEC_DESCRIPTION_KEY, "Topology for Azkaban"); properties.put(topology2Prefix + ServiceConfigKeys.TOPOLOGYSPEC_VERSION_KEY, "2"); properties.put(topology2Prefix + ServiceConfigKeys.TOPOLOGYSPEC_URI_KEY, "/mySpecs/" + topology2); - properties.put(topology2Prefix + ServiceConfigKeys.SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY, - ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER); + properties.put(topology2Prefix + ServiceConfigKeys.SPEC_EXECUTOR_KEY, + ServiceConfigKeys.DEFAULT_SPEC_EXECUTOR); properties.put(topology2Prefix + ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY, "nosql:hdfs"); _config = ConfigUtils.propertiesToConfig(properties); @@ -94,4 +94,4 @@ public class ConfigBasedTopologySpecFactoryTest { "Version did not match with construction"); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gradle/scripts/dependencyDefinitions.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index 24b2473..cde546c 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -91,6 +91,7 @@ ext.externalDependency = [ "jacksonMapper": "org.codehaus.jackson:jackson-mapper-asl:1.9.13", "jasypt": "org.jasypt:jasypt:1.9.2", "jodaTime": "joda-time:joda-time:2.9.3", + "jgrapht": "org.jgrapht:jgrapht-core:0.9.2", "metricsCore": "io.dropwizard.metrics:metrics-core:" + dropwizardMetricsVersion, "metricsJvm": "io.dropwizard.metrics:metrics-jvm:" + dropwizardMetricsVersion, "metricsGraphite": "io.dropwizard.metrics:metrics-graphite:" + dropwizardMetricsVersion,
