Repository: incubator-gobblin Updated Branches: refs/heads/master eb2c128dc -> 5a6bfea9f
[GOBBLIN-559] Implement FlowGraph as a concurrent data structure. Closes #2423 from sv2000/multithread Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5a6bfea9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5a6bfea9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5a6bfea9 Branch: refs/heads/master Commit: 5a6bfea9f689360ffb7c5a5c4fef95487b5b4e4c Parents: eb2c128 Author: sv2000 <[email protected]> Authored: Fri Aug 17 23:22:54 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Aug 17 23:22:54 2018 -0700 ---------------------------------------------------------------------- .../modules/flow/FlowGraphPathFinder.java | 345 ------------------- .../modules/flow/MultiHopFlowCompiler.java | 38 +- .../modules/flowgraph/BaseFlowGraph.java | 146 +++++--- .../service/modules/flowgraph/FlowGraph.java | 19 + .../flowgraph/FlowGraphConfigurationKeys.java | 8 +- .../pathfinder/AbstractPathFinder.java | 259 ++++++++++++++ .../flowgraph/pathfinder/BFSPathFinder.java | 138 ++++++++ .../flowgraph/pathfinder/PathFinder.java | 44 +++ 8 files changed, 587 insertions(+), 410 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java deleted file mode 100644 index 59c831d..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPathFinder.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service.modules.flow; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import org.apache.commons.lang3.tuple.Pair; - -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; - -import lombok.extern.slf4j.Slf4j; - -import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.runtime.api.FlowSpec; -import org.apache.gobblin.runtime.api.JobTemplate; -import org.apache.gobblin.runtime.api.SpecExecutor; -import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; -import org.apache.gobblin.service.modules.flowgraph.Dag; -import org.apache.gobblin.service.modules.flowgraph.DataNode; -import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; -import org.apache.gobblin.service.modules.flowgraph.FlowEdge; -import org.apache.gobblin.service.modules.flowgraph.FlowGraph; -import org.apache.gobblin.service.modules.spec.JobExecutionPlan; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.reflection.GobblinConstructorUtils; - - -@Alpha -@Slf4j -public class FlowGraphPathFinder { - private static final String SOURCE_PREFIX = "source"; - private static final String DESTINATION_PREFIX = "destination"; - - private FlowGraph flowGraph; - private FlowSpec flowSpec; - private Config flowConfig; - - private DataNode srcNode; - private List<DataNode> destNodes; - - private DatasetDescriptor srcDatasetDescriptor; - private DatasetDescriptor destDatasetDescriptor; - - //Maintain path of FlowEdges as parent-child map - private Map<FlowEdgeContext, FlowEdgeContext> pathMap; - - //Flow Execution Id - private Long flowExecutionId; - - /** - * Constructor. - * @param flowGraph - */ - public FlowGraphPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) { - this.flowGraph = flowGraph; - this.flowSpec = flowSpec; - this.flowConfig = flowSpec.getConfig(); - - //Get src/dest DataNodes from the flow config - String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""); - - List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY); - this.srcNode = this.flowGraph.getNode(srcNodeId); - Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId); - for (String destNodeId : destNodeIds) { - DataNode destNode = this.flowGraph.getNode(destNodeId); - Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId); - if (this.destNodes == null) { - this.destNodes = new ArrayList<>(); - } - this.destNodes.add(destNode); - } - //Get src/dest dataset descriptors from the flow config - Config srcDatasetDescriptorConfig = - flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX); - Config destDatasetDescriptorConfig = - flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX); - - try { - Class srcdatasetDescriptorClass = - Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); - this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils - .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig); - Class destDatasetDescriptorClass = - Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); - this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils - .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public FlowGraphPath findPath() throws PathFinderException { - // Generate flow execution id for this compilation - this.flowExecutionId = System.currentTimeMillis(); - - FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId); - //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the - // flow graph. - // TODO: we can easily improve the performance by using a ReentrantReadWriteLock associated with the FlowGraph. This will - // allow multiple concurrent readers to not be blocked on each other, as long as there are no writers. - synchronized (this.flowGraph) { - for (DataNode destNode : this.destNodes) { - List<FlowEdgeContext> path = findPathBFS(destNode); - if (path != null) { - flowGraphPath.addPath(path); - } else { - //No path to at least one of the destination nodes. - return null; - } - } - } - return flowGraphPath; - } - - /** - * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s - * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are - * added first to the queue. This ensures that dataset transformations are always performed closest to the source. - * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode. - */ - private List<FlowEdgeContext> findPathBFS(DataNode destNode) - throws PathFinderException { - try { - //Initialization of auxiliary data structures used for path computation - this.pathMap = new HashMap<>(); - - //Base condition 1: Source Node or Dest Node is inactive; return null - if (!srcNode.isActive() || !destNode.isActive()) { - log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", - this.srcNode.getId(), destNode.getId()); - return null; - } - - //Base condition 2: Check if we are already at the target. If so, return an empty path. - if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) { - return new ArrayList<>(); - } - - LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>(); - edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor)); - for (FlowEdgeContext flowEdgeContext : edgeQueue) { - this.pathMap.put(flowEdgeContext, flowEdgeContext); - } - - //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges - // to the edge E. For each adjacent edge E', do the following: - // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and - // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the - // edge E'. If yes, add the edge E' to the edge queue. - // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration. - while (!edgeQueue.isEmpty()) { - FlowEdgeContext flowEdgeContext = edgeQueue.pop(); - - DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest()); - DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); - - //Are we done? - if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) { - return constructPath(flowEdgeContext); - } - - //Expand the currentNode to its adjacent edges and add them to the queue. - List<FlowEdgeContext> nextEdges = - getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor); - for (FlowEdgeContext childFlowEdgeContext : nextEdges) { - //Add a pointer from the child edge to the parent edge, if the child edge is not already in the - // queue. - if (!this.pathMap.containsKey(childFlowEdgeContext)) { - edgeQueue.add(childFlowEdgeContext); - this.pathMap.put(childFlowEdgeContext, flowEdgeContext); - } - } - } - //No path found. Return null. - return null; - } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) { - throw new PathFinderException( - "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode - .getId(), e); - } - } - - private boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor, - DatasetDescriptor destDatasetDescriptor) { - if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) { - return true; - } - return false; - } - - /** - * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an - * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor. - * @param dataNode - * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge. - * @param destDatasetDescriptor Target {@link DatasetDescriptor}. - * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion. - */ - private List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor, - DatasetDescriptor destDatasetDescriptor) { - List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>(); - for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) { - try { - DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest()); - //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive. - if (!edgeDestination.isActive() || !flowEdge.isActive()) { - continue; - } - - boolean foundExecutor = false; - //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template. - for (SpecExecutor specExecutor : flowEdge.getExecutors()) { - Config mergedConfig = getMergedConfig(flowEdge, specExecutor); - List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs = - flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig); - for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) { - DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft(); - DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight(); - if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) { - FlowEdgeContext flowEdgeContext; - if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) { - //If datasets described by the currentDatasetDescriptor is a subset of the datasets described - // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g. - // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward. - flowEdgeContext = - new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, - specExecutor); - } else { - //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge) - flowEdgeContext = - new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, - specExecutor); - } - if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) { - //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible - // with those of destination dataset descriptor. - // In other words, we prioritize edges that perform data transformations as close to the source as possible. - prioritizedEdgeList.add(0, flowEdgeContext); - } else { - prioritizedEdgeList.add(flowEdgeContext); - } - foundExecutor = true; - } - } - // Found a SpecExecutor. Proceed to the next FlowEdge. - // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves. - if (foundExecutor) { - break; - } - } - } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException - | JobTemplate.TemplateException e) { - //Skip the edge; and continue - log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e); - } - } - return prioritizedEdgeList; - } - - /** - * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below): - * <ul> - * <p> the user provided flow config </p> - * <p> edge specific properties/overrides </p> - * <p> spec executor config/overrides </p> - * <p> source node config </p> - * <p> destination node config </p> - * </ul> - * Each {@link JobTemplate}'s config will eventually be resolved against this merged config. - * @param flowEdge An instance of {@link FlowEdge}. - * @param specExecutor A {@link SpecExecutor}. - * @return the merged config derived as described above. - */ - private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor) - throws ExecutionException, InterruptedException { - Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX); - Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX); - Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig()) - .withFallback(srcNodeConfig).withFallback(destNodeConfig); - return mergedConfig; - } - - /** - * - * @param flowEdgeContext of the last {@link FlowEdge} in the path. - * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}. - * @throws IOException - * @throws SpecNotFoundException - * @throws JobTemplate.TemplateException - * @throws URISyntaxException - */ - private List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext) - throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { - //Backtrace from the last edge using the path map and push each edge into a LIFO data structure. - List<FlowEdgeContext> path = new LinkedList<>(); - path.add(flowEdgeContext); - FlowEdgeContext currentFlowEdgeContext = flowEdgeContext; - while (true) { - path.add(0, this.pathMap.get(currentFlowEdgeContext)); - currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext); - //Are we at the first edge in the path? - if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) { - break; - } - } - return path; - } - - public static class PathFinderException extends Exception { - public PathFinderException(String message, Throwable cause) { - super(message, cause); - } - - public PathFinderException(String message) { - super(message); - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 83951a0..1ab8312 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -20,7 +20,6 @@ package org.apache.gobblin.service.modules.flow; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Map; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -28,7 +27,6 @@ import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; import com.typesafe.config.Config; import lombok.Getter; @@ -42,15 +40,16 @@ import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.api.SpecNotFoundException; -import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor; import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.FlowGraph; +import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory; import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; +import org.apache.gobblin.util.ConfigUtils; /*** @@ -60,8 +59,9 @@ import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; @Alpha @Slf4j public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { + @Getter - private FlowGraph flowGraph; + private final FlowGraph flowGraph; private GitFlowGraphMonitor gitFlowGraphMonitor; @Getter private boolean active; @@ -80,9 +80,8 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) { super(config, log, instrumentationEnabled); - Config templateCatalogCfg = config - .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, - config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + Config templateCatalogCfg = config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); FSFlowCatalog flowCatalog; try { flowCatalog = new FSFlowCatalog(templateCatalogCfg); @@ -118,32 +117,32 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { long startTime = System.nanoTime(); FlowSpec flowSpec = (FlowSpec) spec; - String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY); - String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY); + String source = ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""); + String destination = + ConfigUtils.getString(flowSpec.getConfig(), ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, ""); log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination)); Dag<JobExecutionPlan> jobExecutionPlanDag; - FlowGraphPathFinder pathFinder = new FlowGraphPathFinder(this.flowGraph, flowSpec); try { //Compute the path from source to destination. - FlowGraphPath flowGraphPath = pathFinder.findPath(); - + FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec); //Convert the path into a Dag of JobExecutionPlans. if (flowGraphPath != null) { jobExecutionPlanDag = flowGraphPath.asDag(); } else { - Instrumented.markMeter(this.flowCompilationFailedMeter); + Instrumented.markMeter(flowCompilationFailedMeter); log.info(String.format("No path found from source: %s and destination: %s", source, destination)); return new JobExecutionPlanDagFactory().createDag(new ArrayList<>()); } - } catch (FlowGraphPathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException e) { - Instrumented.markMeter(this.flowCompilationFailedMeter); - log.error(String.format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), e); + } catch (PathFinder.PathFinderException | SpecNotFoundException | JobTemplate.TemplateException | URISyntaxException | ReflectiveOperationException e) { + Instrumented.markMeter(flowCompilationFailedMeter); + log.error(String + .format("Exception encountered while compiling flow for source: %s and destination: %s", source, destination), + e); return null; } - Instrumented.markMeter(this.flowCompilationSuccessFulMeter); - Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - + Instrumented.markMeter(flowCompilationSuccessFulMeter); + Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); return jobExecutionPlanDag; } @@ -152,5 +151,4 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { log.warn("No population of templates based on edge happen in this implementation"); return; } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java index edf40cc..e2b256f 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java @@ -21,8 +21,15 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.service.modules.flow.FlowGraphPath; +import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import lombok.extern.slf4j.Slf4j; @@ -32,10 +39,14 @@ import lombok.extern.slf4j.Slf4j; * <p>dataNodeMap - the mapping from a node identifier to the {@link DataNode} instance</p> * <p>nodesToEdges - the mapping from each {@link DataNode} to its outgoing {@link FlowEdge}s</p> * <p>flowEdgeMap - the mapping from a edge label to the {@link FlowEdge} instance</p> + * + * Read/Write Access to the {@link FlowGraph} is synchronized via a {@link ReentrantReadWriteLock}. */ @Alpha @Slf4j public class BaseFlowGraph implements FlowGraph { + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true); + private Map<DataNode, Set<FlowEdge>> nodesToEdges = new HashMap<>(); private Map<String, DataNode> dataNodeMap = new HashMap<>(); private Map<String, FlowEdge> flowEdgeMap = new HashMap<>(); @@ -58,11 +69,16 @@ public class BaseFlowGraph implements FlowGraph { * @return true if node is successfully added to the {@link FlowGraph}. */ @Override - public synchronized boolean addDataNode(DataNode node) { - //Get edges adjacent to the node if it already exists - Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>()); - this.nodesToEdges.put(node, edges); - this.dataNodeMap.put(node.getId(), node); + public boolean addDataNode(DataNode node) { + try { + rwLock.writeLock().lock(); + //Get edges adjacent to the node if it already exists + Set<FlowEdge> edges = this.nodesToEdges.getOrDefault(node, new HashSet<>()); + this.nodesToEdges.put(node, edges); + this.dataNodeMap.put(node.getId(), node); + } finally { + rwLock.writeLock().unlock(); + } return true; } @@ -74,16 +90,20 @@ public class BaseFlowGraph implements FlowGraph { * @return true if addition of {@FlowEdge} is successful. */ @Override - public synchronized boolean addFlowEdge(FlowEdge edge) { - String srcNode = edge.getSrc(); - String dstNode = edge.getDest(); - if(!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) { - return false; - } - DataNode dataNode = getNode(srcNode); - if(dataNode != null) { + public boolean addFlowEdge(FlowEdge edge) { + try { + rwLock.writeLock().lock(); + String srcNode = edge.getSrc(); + String dstNode = edge.getDest(); + if (!dataNodeMap.containsKey(srcNode) || !dataNodeMap.containsKey(dstNode)) { + return false; + } + DataNode dataNode = getNode(srcNode); + if (dataNode == null) { + return false; + } Set<FlowEdge> adjacentEdges = this.nodesToEdges.get(dataNode); - if(!adjacentEdges.add(edge)) { + if (!adjacentEdges.add(edge)) { adjacentEdges.remove(edge); adjacentEdges.add(edge); } @@ -91,8 +111,8 @@ public class BaseFlowGraph implements FlowGraph { String edgeId = edge.getId(); this.flowEdgeMap.put(edgeId, edge); return true; - } else { - return false; + } finally { + rwLock.writeLock().unlock(); } } @@ -102,11 +122,12 @@ public class BaseFlowGraph implements FlowGraph { * @return true if {@link DataNode} is successfully deleted. */ @Override - public synchronized boolean deleteDataNode(String nodeId) { - if(this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId))) { - return true; - } else { - return false; + public boolean deleteDataNode(String nodeId) { + try { + rwLock.writeLock().lock(); + return this.dataNodeMap.containsKey(nodeId) && deleteDataNode(this.dataNodeMap.get(nodeId)); + } finally { + rwLock.writeLock().unlock(); } } @@ -115,20 +136,25 @@ public class BaseFlowGraph implements FlowGraph { * @param node to be deleted. * @return true if {@link DataNode} is successfully deleted. */ - public synchronized boolean deleteDataNode(DataNode node) { - if(dataNodeMap.containsKey(node.getId())) { + public boolean deleteDataNode(DataNode node) { + try { + rwLock.writeLock().lock(); + if (!dataNodeMap.containsKey(node.getId())) { + return false; + } //Delete node from dataNodeMap dataNodeMap.remove(node.getId()); //Delete all the edges adjacent to the node. First, delete edges from flowEdgeMap and next, remove the edges // from nodesToEdges - for(FlowEdge edge: nodesToEdges.get(node)) { + for (FlowEdge edge : nodesToEdges.get(node)) { flowEdgeMap.remove(edge.getId()); } nodesToEdges.remove(node); return true; - } else { - return false; + + } finally { + rwLock.writeLock().unlock(); } } @@ -138,11 +164,12 @@ public class BaseFlowGraph implements FlowGraph { * @return true if {@link FlowEdge} is successfully deleted. */ @Override - public synchronized boolean deleteFlowEdge(String edgeId) { - if(flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId))) { - return true; - } else { - return false; + public boolean deleteFlowEdge(String edgeId) { + try { + rwLock.writeLock().lock(); + return flowEdgeMap.containsKey(edgeId) && deleteFlowEdge(flowEdgeMap.get(edgeId)); + } finally { + rwLock.writeLock().unlock(); } } @@ -152,17 +179,22 @@ public class BaseFlowGraph implements FlowGraph { * @return true if {@link FlowEdge} is successfully deleted. If the source of a {@link FlowEdge} does not exist or * if the {@link FlowEdge} is not in the graph, return false. */ - public synchronized boolean deleteFlowEdge(FlowEdge edge) { - if(!dataNodeMap.containsKey(edge.getSrc())) { - return false; - } - DataNode node = dataNodeMap.get(edge.getSrc()); - if(!nodesToEdges.get(node).contains(edge)) { - return false; + public boolean deleteFlowEdge(FlowEdge edge) { + try { + rwLock.writeLock().lock(); + if (!dataNodeMap.containsKey(edge.getSrc())) { + return false; + } + DataNode node = dataNodeMap.get(edge.getSrc()); + if (!nodesToEdges.get(node).contains(edge)) { + return false; + } + this.nodesToEdges.get(node).remove(edge); + this.flowEdgeMap.remove(edge.getId()); + return true; + } finally { + rwLock.writeLock().unlock(); } - this.nodesToEdges.get(node).remove(edge); - this.flowEdgeMap.remove(edge.getId()); - return true; } /** @@ -172,8 +204,13 @@ public class BaseFlowGraph implements FlowGraph { */ @Override public Set<FlowEdge> getEdges(String nodeId) { - DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null); - return getEdges(dataNode); + try { + rwLock.readLock().lock(); + DataNode dataNode = this.dataNodeMap.getOrDefault(nodeId, null); + return getEdges(dataNode); + } finally { + rwLock.readLock().unlock(); + } } /** @@ -183,7 +220,28 @@ public class BaseFlowGraph implements FlowGraph { */ @Override public Set<FlowEdge> getEdges(DataNode node) { - return (node != null)? this.nodesToEdges.getOrDefault(node, null) : null; + try { + rwLock.readLock().lock(); + return (node != null) ? this.nodesToEdges.getOrDefault(node, null) : null; + } finally { + rwLock.readLock().unlock(); + } } + /**{@inheritDoc}**/ + @Override + public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException { + try { + rwLock.readLock().lock(); + //Instantiate a PathFinder. + Class pathFinderClass = Class.forName(ConfigUtils + .getString(flowSpec.getConfig(), FlowGraphConfigurationKeys.FLOW_GRAPH_PATH_FINDER_CLASS, + FlowGraphConfigurationKeys.DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS)); + PathFinder pathFinder = + (PathFinder) GobblinConstructorUtils.invokeLongestConstructor(pathFinderClass, this, flowSpec); + return pathFinder.findPath(); + } finally { + rwLock.readLock().unlock(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java index b4aa7bf..1d8eb23 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraph.java @@ -19,7 +19,13 @@ package org.apache.gobblin.service.modules.flowgraph; import java.util.Collection; +import com.typesafe.config.Config; + import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.service.modules.flow.FlowGraphPath; +import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder; /** @@ -80,4 +86,17 @@ public interface FlowGraph { * @return a collection of edges adjacent to the {@link DataNode} */ public Collection<FlowEdge> getEdges(DataNode node); + + /** + * A method that takes a {@link FlowSpec} containing the source and destination {@link DataNode}s, as well as the + * source and target {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}s, and returns a sequence + * of fully resolved {@link org.apache.gobblin.runtime.api.JobSpec}s that will move the source dataset + * from the source datanode, perform any necessary transformations and land the dataset at the destination node + * in the format described by the target {@link org.apache.gobblin.service.modules.dataset.DatasetDescriptor}. + * + * @param flowSpec a {@link org.apache.gobblin.runtime.api.Spec} containing a high-level description of input flow. + * @return an instance of {@link FlowGraphPath} that encapsulates a sequence of {@link org.apache.gobblin.runtime.api.JobSpec}s + * satisfying flowSpec. + */ + public FlowGraphPath findPath(FlowSpec flowSpec) throws PathFinder.PathFinderException, ReflectiveOperationException; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java index 8a49ec0..5a43a83 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java @@ -20,7 +20,7 @@ package org.apache.gobblin.service.modules.flowgraph; public class FlowGraphConfigurationKeys { public static final String DATA_NODE_PREFIX = "data.node."; public static final String FLOW_EDGE_PREFIX = "flow.edge."; - + public static final String FLOW_GRAPH_PREFIX = "flow.graph."; /** * {@link DataNode} related configuration keys. */ @@ -42,4 +42,10 @@ public class FlowGraphConfigurationKeys { public static final String FLOW_EDGE_TEMPLATE_DIR_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateDirUri"; public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors"; public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecInstance.class"; + + /** + * {@link org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder} related configuration keys. + */ + public static final String FLOW_GRAPH_PATH_FINDER_CLASS = FLOW_GRAPH_PREFIX + "pathfinder.class"; + public static final String DEFAULT_FLOW_GRAPH_PATH_FINDER_CLASS = "org.apache.gobblin.service.modules.flowgraph.pathfinder.BFSPathFinder"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java new file mode 100644 index 0000000..9901c2f --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph.pathfinder; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.commons.lang3.tuple.Pair; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.flow.FlowEdgeContext; +import org.apache.gobblin.service.modules.flow.FlowGraphPath; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.DataNode; +import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.flowgraph.FlowGraph; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +@Alpha +@Slf4j +public abstract class AbstractPathFinder implements PathFinder { + private static final String SOURCE_PREFIX = "source"; + private static final String DESTINATION_PREFIX = "destination"; + + protected FlowGraph flowGraph; + protected FlowSpec flowSpec; + protected Config flowConfig; + + protected DataNode srcNode; + protected List<DataNode> destNodes; + + protected DatasetDescriptor srcDatasetDescriptor; + protected DatasetDescriptor destDatasetDescriptor; + + //Maintain path of FlowEdges as parent-child map + protected Map<FlowEdgeContext, FlowEdgeContext> pathMap; + + //Flow Execution Id + protected Long flowExecutionId; + + public AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) + throws ReflectiveOperationException { + this.flowGraph = flowGraph; + this.flowSpec = flowSpec; + this.flowConfig = flowSpec.getConfig(); + + //Get src/dest DataNodes from the flow config + String srcNodeId = ConfigUtils.getString(flowConfig, ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""); + + List<String> destNodeIds = ConfigUtils.getStringList(flowConfig, ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY); + this.srcNode = this.flowGraph.getNode(srcNodeId); + Preconditions.checkArgument(srcNode != null, "Flowgraph does not have a node with id " + srcNodeId); + for (String destNodeId : destNodeIds) { + DataNode destNode = this.flowGraph.getNode(destNodeId); + Preconditions.checkArgument(destNode != null, "Flowgraph does not have a node with id " + destNodeId); + if (this.destNodes == null) { + this.destNodes = new ArrayList<>(); + } + this.destNodes.add(destNode); + } + //Get src/dest dataset descriptors from the flow config + Config srcDatasetDescriptorConfig = + flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX); + Config destDatasetDescriptorConfig = + flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX); + + Class srcdatasetDescriptorClass = + Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); + this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils + .invokeLongestConstructor(srcdatasetDescriptorClass, srcDatasetDescriptorConfig); + Class destDatasetDescriptorClass = + Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); + this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils + .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig); + } + + protected boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor, + DatasetDescriptor destDatasetDescriptor) { + if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) { + return true; + } + return false; + } + + /** + * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an + * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor. + * @param dataNode + * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge. + * @param destDatasetDescriptor Target {@link DatasetDescriptor}. + * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion. + */ + protected List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor, + DatasetDescriptor destDatasetDescriptor) { + List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>(); + for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) { + try { + DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest()); + //Base condition: Skip this FLowEdge, if it is inactive or if the destination of this edge is inactive. + if (!edgeDestination.isActive() || !flowEdge.isActive()) { + continue; + } + + boolean foundExecutor = false; + //Iterate over all executors for this edge. Find the first one that resolves the underlying flow template. + for (SpecExecutor specExecutor : flowEdge.getExecutors()) { + Config mergedConfig = getMergedConfig(flowEdge, specExecutor); + List<Pair<DatasetDescriptor, DatasetDescriptor>> datasetDescriptorPairs = + flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig); + for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) { + DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft(); + DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight(); + if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) { + FlowEdgeContext flowEdgeContext; + if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) { + //If datasets described by the currentDatasetDescriptor is a subset of the datasets described + // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g. + // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward. + flowEdgeContext = + new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, + specExecutor); + } else { + //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge) + flowEdgeContext = + new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, + specExecutor); + } + if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) { + //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible + // with those of destination dataset descriptor. + // In other words, we prioritize edges that perform data transformations as close to the source as possible. + prioritizedEdgeList.add(0, flowEdgeContext); + } else { + prioritizedEdgeList.add(flowEdgeContext); + } + foundExecutor = true; + } + } + // Found a SpecExecutor. Proceed to the next FlowEdge. + // TODO: Choose the min-cost executor for the FlowEdge as opposed to the first one that resolves. + if (foundExecutor) { + break; + } + } + } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException + | JobTemplate.TemplateException e) { + //Skip the edge; and continue + log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e); + } + } + return prioritizedEdgeList; + } + + /** + * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below): + * <ul> + * <p> the user provided flow config </p> + * <p> edge specific properties/overrides </p> + * <p> spec executor config/overrides </p> + * <p> source node config </p> + * <p> destination node config </p> + * </ul> + * Each {@link JobTemplate}'s config will eventually be resolved against this merged config. + * @param flowEdge An instance of {@link FlowEdge}. + * @param specExecutor A {@link SpecExecutor}. + * @return the merged config derived as described above. + */ + private Config getMergedConfig(FlowEdge flowEdge, SpecExecutor specExecutor) + throws ExecutionException, InterruptedException { + Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX); + Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX); + Config mergedConfig = flowConfig.withFallback(specExecutor.getConfig().get()).withFallback(flowEdge.getConfig()) + .withFallback(srcNodeConfig).withFallback(destNodeConfig); + return mergedConfig; + } + + /** + * + * @param flowEdgeContext of the last {@link FlowEdge} in the path. + * @return a {@link Dag} of {@link JobExecutionPlan}s for the input {@link FlowSpec}. + * @throws IOException + * @throws SpecNotFoundException + * @throws JobTemplate.TemplateException + * @throws URISyntaxException + */ + protected List<FlowEdgeContext> constructPath(FlowEdgeContext flowEdgeContext) + throws IOException, SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + //Backtrace from the last edge using the path map and push each edge into a LIFO data structure. + List<FlowEdgeContext> path = new LinkedList<>(); + path.add(flowEdgeContext); + FlowEdgeContext currentFlowEdgeContext = flowEdgeContext; + while (true) { + path.add(0, this.pathMap.get(currentFlowEdgeContext)); + currentFlowEdgeContext = this.pathMap.get(currentFlowEdgeContext); + //Are we at the first edge in the path? + if (this.pathMap.get(currentFlowEdgeContext).equals(currentFlowEdgeContext)) { + break; + } + } + return path; + } + + @Override + public FlowGraphPath findPath() throws PathFinderException { + // Generate flow execution id for this compilation + this.flowExecutionId = System.currentTimeMillis(); + + FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId); + //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the + // flow graph. + for (DataNode destNode : this.destNodes) { + List<FlowEdgeContext> path = findPathUnicast(destNode); + if (path != null) { + flowGraphPath.addPath(path); + } else { + //No path to at least one of the destination nodes. + return null; + } + } + return flowGraphPath; + } + + public abstract List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws PathFinderException; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java new file mode 100644 index 0000000..02dd67b --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph.pathfinder; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.service.modules.dataset.DatasetDescriptor; +import org.apache.gobblin.service.modules.flow.FlowEdgeContext; +import org.apache.gobblin.service.modules.flowgraph.DataNode; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.flowgraph.FlowGraph; + + +/** + * An implementation of {@link PathFinder} that assumes an unweighted {@link FlowGraph} and computes the + * shortest path using a variant of the BFS path-finding algorithm. This implementation has two key differences from the + * traditional BFS implementations: + * <ul> + * <p> the input graph is a multi-graph i.e. there could be multiple edges between each pair of nodes, and </p> + * <p> each edge has a label associated with it. In our case, the label corresponds to the set of input/output + * dataset descriptors that are accepted by the edge.</p> + * </ul> + * Given these differences, we maintain: + * <p> a {@link HashMap} of list of visited edges, as opposed to list of visited + * vertices as in the case of traditional BFS, and </p> + * <p> for each edge, we maintain additional state that includes the input/output dataset descriptor + * associated with the particular visitation of that edge. </p> + * This additional information allows us to accurately mark edges as visited and guarantee termination of the algorithm. + */ +@Alpha +@Slf4j +public class BFSPathFinder extends AbstractPathFinder { + /** + * Constructor. + * @param flowGraph + */ + public BFSPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException { + super(flowGraph, flowSpec); + } + + /** + * A simple path finding algorithm based on Breadth-First Search. At every step the algorithm adds the adjacent {@link FlowEdge}s + * to a queue. The {@link FlowEdge}s whose output {@link DatasetDescriptor} matches the destDatasetDescriptor are + * added first to the queue. This ensures that dataset transformations are always performed closest to the source. + * @return a path of {@link FlowEdgeContext}s starting at the srcNode and ending at the destNode. + */ + public List<FlowEdgeContext> findPathUnicast(DataNode destNode) throws PathFinderException { + try { + //Initialization of auxiliary data structures used for path computation + this.pathMap = new HashMap<>(); + + // Generate flow execution id for this compilation + this.flowExecutionId = System.currentTimeMillis(); + + //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the + // flow graph. + //Base condition 1: Source Node or Dest Node is inactive; return null + if (!srcNode.isActive() || !destNode.isActive()) { + log.warn("Either source node {} or destination node {} is inactive; skipping path computation.", this.srcNode.getId(), + destNode.getId()); + return null; + } + + //Base condition 2: Check if we are already at the target. If so, return an empty path. + if ((srcNode.equals(destNode)) && destDatasetDescriptor.contains(srcDatasetDescriptor)) { + return new ArrayList<>(); + } + + LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>(); + edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor)); + for (FlowEdgeContext flowEdgeContext : edgeQueue) { + this.pathMap.put(flowEdgeContext, flowEdgeContext); + } + + //At every step, pop an edge E from the edge queue. Mark the edge E as visited. Generate the list of adjacent edges + // to the edge E. For each adjacent edge E', do the following: + // 1. check if the FlowTemplate described by E' is resolvable using the flowConfig, and + // 2. check if the output dataset descriptor of edge E is compatible with the input dataset descriptor of the + // edge E'. If yes, add the edge E' to the edge queue. + // If the edge E' satisfies 1 and 2, add it to the edge queue for further consideration. + while (!edgeQueue.isEmpty()) { + FlowEdgeContext flowEdgeContext = edgeQueue.pop(); + + DataNode currentNode = this.flowGraph.getNode(flowEdgeContext.getEdge().getDest()); + DatasetDescriptor currentOutputDatasetDescriptor = flowEdgeContext.getOutputDatasetDescriptor(); + + //Are we done? + if (isPathFound(currentNode, destNode, currentOutputDatasetDescriptor, destDatasetDescriptor)) { + return constructPath(flowEdgeContext); + } + + //Expand the currentNode to its adjacent edges and add them to the queue. + List<FlowEdgeContext> nextEdges = + getNextEdges(currentNode, currentOutputDatasetDescriptor, destDatasetDescriptor); + for (FlowEdgeContext childFlowEdgeContext : nextEdges) { + //Add a pointer from the child edge to the parent edge, if the child edge is not already in the + // queue. + if (!this.pathMap.containsKey(childFlowEdgeContext)) { + edgeQueue.add(childFlowEdgeContext); + this.pathMap.put(childFlowEdgeContext, flowEdgeContext); + } + } + } + //No path found. Return null. + return null; + } catch (SpecNotFoundException | JobTemplate.TemplateException | IOException | URISyntaxException e) { + throw new PathFinder.PathFinderException( + "Exception encountered when computing path from src: " + this.srcNode.getId() + " to dest: " + destNode.getId(), e); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5a6bfea9/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java new file mode 100644 index 0000000..982d7f6 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/PathFinder.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flowgraph.pathfinder; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.service.modules.flow.FlowGraphPath; + + +/** + * An interface for computing a path in a {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph}. Each + * implementation of {@link PathFinder} implements a specific path finding algorithm such as Breadth-First Search (BFS), + * Dijkstra's shortest-path algorithm etc. + */ +@Alpha +public interface PathFinder { + + public FlowGraphPath findPath() throws PathFinderException; + + public static class PathFinderException extends Exception { + public PathFinderException(String message, Throwable cause) { + super(message, cause); + } + + public PathFinderException(String message) { + super(message); + } + } + +}
