This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 76d692e [GOBBLIN-754] Clean old version of multi-hop compiler
76d692e is described below
commit 76d692e253ef8a5b3d5524cf566398cf62d42c29
Author: autumnust <[email protected]>
AuthorDate: Wed Jun 5 16:04:17 2019 -0700
[GOBBLIN-754] Clean old version of multi-hop compiler
Closes #2660 from autumnust/craftsmanshipCleaning
---
.../modules/flow/BaseFlowToJobSpecCompiler.java | 26 --
.../service/modules/flow/FlowEdgeProps.java | 67 ----
.../flow/IdentityFlowToJobSpecCompiler.java | 6 -
.../modules/flow/LoadBasedFlowEdgeImpl.java | 180 -----------
.../service/modules/flow/MultiHopFlowCompiler.java | 5 -
.../flow/MultiHopsFlowToJobSpecCompiler.java | 358 ---------------------
.../service/modules/policy/ServicePolicy.java | 51 ---
.../modules/policy/StaticServicePolicy.java | 98 ------
.../service/modules/utils/DistancedNode.java | 77 -----
.../service/modules/utils/FindPathUtils.java | 109 -------
.../core/MultiHopsFlowToJobSpecCompilerTest.java | 326 -------------------
11 files changed, 1303 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 8374211..257ad3d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -49,7 +49,6 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
@@ -71,20 +70,6 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
@Setter
protected final Map<URI, TopologySpec> topologySpecMap;
-
- /**
- * Mapping between each FlowEdge and a list of applicable Templates.
- * Compiler should obtain this Map info from higher level component.
- * since {@link TopologySpec} doesn't contain Templates.
- * Key: EdgeIdentifier from {@link
org.apache.gobblin.runtime.api.FlowEdge#getEdgeIdentity()}
- * Value: List of template URI.
- */
- // TODO: Define how template info are instantiated. ETL-6217
- @Getter
- @Setter
- protected final Map<String, List<URI>> edgeTemplateMap;
-
-
protected final Config config;
protected final Logger log;
protected final Optional<FSJobCatalog> templateCatalog;
@@ -128,7 +113,6 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
}
this.topologySpecMap = Maps.newConcurrentMap();
- this.edgeTemplateMap = Maps.newConcurrentMap();
this.config = config;
/***
@@ -295,14 +279,4 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
// For now only first template uri will be honored for Identity
return flowSpec.getTemplateURIs().get().iterator().next();
}
-
- /**
- * Ideally each edge has its own eligible template repository(Based on
{@link SpecExecutor})
- * to pick templates from.
- *
- * This function is to transform from all mixed templates ({@link
#templateCatalog})
- * into categorized {@link #edgeTemplateMap}.
- *
- */
- abstract protected void populateEdgeTemplateMap();
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
deleted file mode 100644
index 751bb09..0000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigObject;
-import java.io.Serializable;
-import java.util.Properties;
-
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
-
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class FlowEdgeProps {
- protected static final boolean DEFAULT_EDGE_SAFETY = true;
-
- /**
- * Contains read-only properties that users want to package in.
- */
- @Getter
- protected Config config;
-
- /**
- * One of the mutable properties of an edge.
- */
- @Getter
- @Setter
- private boolean isEdgeSecure;
-
- public FlowEdgeProps(Config config) {
- this.config = config;
- isEdgeSecure = getInitialEdgeSafety();
- }
-
- public FlowEdgeProps() {
- this(ConfigFactory.empty());
- }
-
- /**
- * When initializing an edge, load and security value from properties will
be used
- * but could be overriden afterwards.
- */
- private boolean getInitialEdgeSafety() {
- return
- config.hasPath(EDGE_SECURITY_KEY) ?
config.getBoolean(EDGE_SECURITY_KEY) : DEFAULT_EDGE_SAFETY;
- }
-}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
index 42c4926..d3b25f5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
@@ -117,10 +117,4 @@ public class IdentityFlowToJobSpecCompiler extends
BaseFlowToJobSpecCompiler {
}
return jobExecutionPlans;
}
-
- @Override
- protected void populateEdgeTemplateMap() {
- log.warn("No population of templates based on edge happen in this
implementation");
- return;
- }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
deleted file mode 100644
index 83b94e3..0000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import com.typesafe.config.Config;
-
-import org.jgrapht.graph.DefaultWeightedEdge;
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
-import lombok.Getter;
-
-/**
- * A base implementation of a flowEdge in the weight multi-edge graph.
- * For a weightedMultiGraph there could be multiple edges between two vertices.
- * Recall that a triplet of <SourceNode, targetNode, specExecutor> determines
one edge.
- * It is expected that {@link
org.jgrapht.graph.DirectedWeightedMultigraph#getAllEdges(Object, Object)}
- * can return multiple edges with the same pair of source and destination but
different SpecExecutor.
- *
- * Each edge has a {@FlowEdgeProp} which contains mutable and immutable
properties.
- * The {@link LoadBasedFlowEdgeImpl} exposes two mutable properties: Load and
Security.
- *
- * Load of an edge is equivalent to weight defined in {@link
DefaultWeightedEdge}.
- * Since {@link #getWeight()} method is protected, {@link #getEdgeLoad()} will
return the load.
- * There's no setLoad, which is logically supposed to happen by invoking
- * {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object,
double)}.
- *
- * Security of an edge describes if an edge is secure to be part of data
movement path at current stage.
- *
- */
-@Alpha
-public class LoadBasedFlowEdgeImpl extends DefaultWeightedEdge implements
FlowEdge {
-
- /**
- * In our cases {@link LoadBasedFlowEdgeImpl} is not likely to be serialized.
- * While as it extends {@link DefaultWeightedEdge} for best practice we made
all fields transient,
- * and specify serialVersionUID.
- */
- private static final long serialVersionUID = 1L;
-
- @Getter
- private transient ServiceNode sourceNode;
- @Getter
- private transient ServiceNode targetNode;
- @Getter
- private transient SpecExecutor specExecutorInstance;
-
- /**
- * Contains both read-only and mutable attributes of properties of an edge.
- * Mutable properties in{@link FlowEdgeProps} expose their Setter & Getter
- * thru. either the {@link FlowEdgeProps}
- * or graph-level api, e.g. {@link
org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}
- *
- * Typical mutable properties of an edge includes:
- * Load(Weight), Security.
- */
- private final transient FlowEdgeProps flowEdgeProps;
-
- public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode,
- FlowEdgeProps flowEdgeProps, SpecExecutor specExecutorInstance) {
- this.sourceNode = sourceNode;
- this.targetNode = targetNode;
- this.flowEdgeProps = flowEdgeProps;
- this.specExecutorInstance = specExecutorInstance;
- }
-
- public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode,
- SpecExecutor specExecutor) {
- this(sourceNode, targetNode, new FlowEdgeProps(specExecutor.getAttrs()),
- specExecutor);
- }
-
- // Load: Directly using {@link DefaultWeightedEdge}'s weight field.
- /**
- * Load:
- * Initialization: super's default constructor
- * Getter: {@link #getEdgeLoad()}} thru. {@link DefaultWeightedEdge}'s
{@link #getWeight()}.
- * Setter:Thru. {@link
org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}
- */
- public double getEdgeLoad() {
- return getWeight();
- }
-
- // Security: Get/Set thru. FlowEdgeProps
- /**
- * Initialization\Getter\Setter: By {@link FlowEdgeProps}
- */
- public boolean getIsEdgeSecure() {
- return flowEdgeProps.isEdgeSecure();
- }
- public void setIsEdgeSecure(boolean isEdgeSecure) {
- this.flowEdgeProps.setEdgeSecure(isEdgeSecure);
- }
-
-
- @Override
- public String getEdgeIdentity() {
- return this.calculateEdgeIdentity(this.sourceNode, this.targetNode,
this.specExecutorInstance);
- }
-
- @Override
- public Config getEdgeProperties() {
- return this.flowEdgeProps.getConfig();
- }
-
- @Override
- /**
- * Naive rule: If edge is secure, then it is qualified to be considered in
path-finding.
- */
- public boolean isEdgeEnabled() {
- return this.flowEdgeProps.isEdgeSecure();
- }
-
-
- /**
- * A naive implementation of edge identity calculation.
- * @return
- */
- public static String calculateEdgeIdentity(ServiceNode sourceNode,
ServiceNode targetNode, SpecExecutor specExecutorInstance){
- return sourceNode.getNodeName() + "-" + specExecutorInstance.getUri() +
"-" + targetNode.getNodeName();
- }
-
- /**
- * Recall that we need a triplet to uniquely define a {@link FlowEdge}:
- * - {@link ServiceNode} sourceNode
- * - {@link ServiceNode} targetNode
- * - {@link SpecExecutor} SpecExecutor
- *
- * We DO NOT distinguish between two edges by other props like weight,
- * as the load should be an attribute of an edge.
- * These are IntelliJ-generated methods for equals and hashCode().
- *
- * @param o The object that being compared
- * @return If two {@link LoadBasedFlowEdgeImpl} are equivalent.
- */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- LoadBasedFlowEdgeImpl that = (LoadBasedFlowEdgeImpl) o;
-
- if (!sourceNode.equals(that.sourceNode)) {
- return false;
- }
- if (!targetNode.equals(that.targetNode)) {
- return false;
- }
- return specExecutorInstance.equals(that.specExecutorInstance);
- }
-
- @Override
- public int hashCode() {
- int result = sourceNode.hashCode();
- result = 31 * result + targetNode.hashCode();
- result = 31 * result + specExecutorInstance.hashCode();
- return result;
- }
-}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index f067344..d9a3812 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -197,11 +197,6 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
return jobExecutionPlanDag;
}
- @Override
- protected void populateEdgeTemplateMap() {
- log.warn("No population of templates based on edge happen in this
implementation");
- }
-
/**
* Register a shutdown hook for this thread.
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
deleted file mode 100644
index 6b0b14a..0000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.policy.ServicePolicy;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
-
-import static org.apache.gobblin.service.ServiceConfigKeys.SERVICE_POLICY_NAME;
-import static
org.apache.gobblin.service.modules.utils.FindPathUtils.dijkstraBasedPathFindingHelper;
-
-// Users are capable to inject hints/prioritization into route selection, in
two forms:
-// 1. PolicyBasedBlockedConnection: Define some undesired routes
-// 2. Specified a complete path. FlowCompiler is responsible to verify if the
path given is valid.
-
-// TODO: Flow monitoring, injecting weight for flowEdge:ETL-6213
-@Slf4j
-public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
-
- private static final Splitter SPLIT_BY_COMMA =
Splitter.on(",").omitEmptyStrings().trimResults();
-
- @Getter
- private DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph =
- new DirectedWeightedMultigraph<>(LoadBasedFlowEdgeImpl.class);
-
- public ServicePolicy servicePolicy;
-
- // Contains user-specified complete path of how the data movement is
executed from source to sink.
- private Optional<String> optionalUserSpecifiedPath;
-
- private FlowEdgeProps defaultFlowEdgeProps = new FlowEdgeProps();
-
- public MultiHopsFlowToJobSpecCompiler(Config config) {
- this(config, Optional.absent(), true);
- }
-
- public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log) {
- this(config, log, true);
- }
-
- public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log,
boolean instrumentationEnabled) {
- super(config, log, instrumentationEnabled);
- String policyClassName = config.hasPath(SERVICE_POLICY_NAME) ?
config.getString(SERVICE_POLICY_NAME)
- : ServiceConfigKeys.DEFAULT_SERVICE_POLICY;
- ClassAliasResolver<ServicePolicy> classResolver = new
ClassAliasResolver<>(ServicePolicy.class);
- try {
- servicePolicy =
classResolver.resolveClass(policyClassName).newInstance();
- } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException e) {
- throw new RuntimeException("Error happen when resolving class for :" +
policyClassName, e);
- }
-
- if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION)
- &&
config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION).size()
> 0) {
- try {
- for (String sourceSinkPair :
config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION)) {
- BaseServiceNodeImpl source = new
BaseServiceNodeImpl(sourceSinkPair.split(":")[0]);
- BaseServiceNodeImpl sink = new
BaseServiceNodeImpl(sourceSinkPair.split(":")[1]);
- URI specExecutorURI = new URI(sourceSinkPair.split(":")[2]);
- servicePolicy.addFlowEdge(
- new LoadBasedFlowEdgeImpl(source, sink,
InMemorySpecExecutor.createDummySpecExecutor(specExecutorURI)));
- }
- } catch (URISyntaxException e) {
- this.log.warn("Constructing of FlowEdge in ServicePolicy Failed");
- }
- }
-
- if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES) &&
-
StringUtils.isNotBlank(config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES)))
{
- for (String blacklistedNode : SPLIT_BY_COMMA.splitToList(
- config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES))) {
- servicePolicy.addServiceNode(new BaseServiceNodeImpl(blacklistedNode));
- }
- }
-
- if (config.hasPath(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH) &&
StringUtils.isNotBlank(
- config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH))) {
- optionalUserSpecifiedPath =
Optional.of(config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH));
- } else {
- optionalUserSpecifiedPath = Optional.absent();
- }
- }
-
- @Override
- public Dag<JobExecutionPlan> compileFlow(Spec spec) {
- // A Map from JobSpec to SpexExecutor, as the output of Flow Compiler.
- Map<Spec, SpecExecutor> specExecutorInstanceMap = Maps.newLinkedHashMap();
- findPath(specExecutorInstanceMap, spec);
- List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
- for (Map.Entry<Spec, SpecExecutor> entry:
specExecutorInstanceMap.entrySet()) {
- JobExecutionPlan jobExecutionPlan = new JobExecutionPlan((JobSpec)
entry.getKey(), entry.getValue());
- jobExecutionPlans.add(jobExecutionPlan);
- }
- return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
- }
-
- /**
- * @return Transform a set of {@link TopologySpec} into a instance of {@link
org.jgrapht.graph.WeightedMultigraph}
- * and filter out connections between blacklisted vertices that user
specified.
- * The output of this function only stays in memory, so each time a logical
flow is compiled, the multigraph will
- * be re-calculated.
- *
- */
- private void inMemoryWeightGraphGenerator() {
- for (TopologySpec topologySpec : topologySpecMap.values()) {
- weightGraphGenerateHelper(topologySpec);
- }
-
- // Filter out connection appearing in servicePolicy.
- // This is where servicePolicy is enforced.
- servicePolicy.populateBlackListedEdges(this.weightedGraph);
- if (servicePolicy.getBlacklistedEdges().size() > 0) {
- for (FlowEdge toDeletedEdge : servicePolicy.getBlacklistedEdges()) {
- weightedGraph.removeEdge(toDeletedEdge);
- }
- }
- }
-
- // Basically a dijkstra path finding for connecting source and sink by
multiple hops in between.
- // If there's any user-specified prioritization, conduct the DFS and see if
the user-specified path is available.
-
- // there's no updates on TopologySpec, or user should be aware of the
possibility
- // that a topologySpec not being reflected in findPath.
- private void findPath(Map<Spec, SpecExecutor> specExecutorInstanceMap, Spec
spec) {
- inMemoryWeightGraphGenerator();
- FlowSpec flowSpec = (FlowSpec) spec;
- if (optionalUserSpecifiedPath.isPresent()) {
- log.info("Starting to evaluate user's specified path ... ");
- if (userSpecifiedPathVerificator(specExecutorInstanceMap, flowSpec)) {
- log.info("User specified path[ " + optionalUserSpecifiedPath.get() +
"] successfully verified.");
- return;
- } else {
- log.error("Will not execute user specified path[ " +
optionalUserSpecifiedPath.get() + "]");
- log.info("Start to execute FlowCompiler's algorithm for valid data
movement path");
- }
- }
-
- ServiceNode sourceNode =
- new
BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY));
-
- ServiceNode targetNode =
- new
BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY));
-
- List<FlowEdge> resultEdgePath = dijkstraBasedPathFindingHelper(sourceNode,
targetNode, this.weightedGraph);
- for (int i = 0; i < resultEdgePath.size() ; i++) {
- FlowEdge tmpFlowEdge = resultEdgePath.get(i);
- ServiceNode edgeSrcNode = ((LoadBasedFlowEdgeImpl)
tmpFlowEdge).getSourceNode();
- ServiceNode edgeTgtNode = ((LoadBasedFlowEdgeImpl)
tmpFlowEdge).getTargetNode();
- specExecutorInstanceMap.put(convertHopToJobSpec(edgeSrcNode,
edgeTgtNode, flowSpec),
- ((LoadBasedFlowEdgeImpl)
(resultEdgePath.get(i))).getSpecExecutorInstance());
- }
- }
-
- /**
- * As the base implementation here, all templates will be considered for
each edge.
- */
- @Override
- protected void populateEdgeTemplateMap() {
- if (templateCatalog.isPresent()) {
- for (FlowEdge flowEdge : this.weightedGraph.edgeSet()) {
- edgeTemplateMap.put(flowEdge.getEdgeIdentity(), templateCatalog.get().
- getAllTemplates().
- stream().map(jobTemplate ->
jobTemplate.getUri()).collect(Collectors.toList()));
- }
- }
- }
-
- // If path specified not existed, return false;
- // else return true.
- private boolean userSpecifiedPathVerificator(Map<Spec, SpecExecutor>
specExecutorInstanceMap, FlowSpec flowSpec) {
- Map<Spec, SpecExecutor> tmpSpecExecutorInstanceMap = new HashMap<>();
- List<String> userSpecfiedPath =
Arrays.asList(optionalUserSpecifiedPath.get().split(","));
- for (int i = 0; i < userSpecfiedPath.size() - 1; i++) {
- ServiceNode sourceNode = new
BaseServiceNodeImpl(userSpecfiedPath.get(i));
- ServiceNode targetNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i
+ 1));
- if (weightedGraph.containsVertex(sourceNode) &&
weightedGraph.containsVertex(targetNode)
- && weightedGraph.containsEdge(sourceNode, targetNode)) {
- tmpSpecExecutorInstanceMap.put(convertHopToJobSpec(sourceNode,
targetNode, flowSpec),
- (((LoadBasedFlowEdgeImpl) weightedGraph.getEdge(sourceNode,
targetNode)).getSpecExecutorInstance()));
- } else {
- log.error("User Specified Path is invalid");
- return false;
- }
- }
- specExecutorInstanceMap.putAll(tmpSpecExecutorInstanceMap);
- return true;
- }
-
- // Helper function for transform TopologySpecMap into a
weightedDirectedGraph.
- private void weightGraphGenerateHelper(TopologySpec topologySpec) {
- try {
- Map<ServiceNode, ServiceNode> capabilities =
topologySpec.getSpecExecutor().getCapabilities().get();
- for (Map.Entry<ServiceNode, ServiceNode> capability :
capabilities.entrySet()) {
-
- BaseServiceNodeImpl sourceNode = new
BaseServiceNodeImpl(capability.getKey().getNodeName());
- BaseServiceNodeImpl targetNode = new
BaseServiceNodeImpl(capability.getValue().getNodeName());
-
- if (!weightedGraph.containsVertex(sourceNode)) {
- weightedGraph.addVertex(sourceNode);
- }
- if (!weightedGraph.containsVertex(targetNode)) {
- weightedGraph.addVertex(targetNode);
- }
-
- FlowEdge flowEdge =
- new LoadBasedFlowEdgeImpl(sourceNode, targetNode,
defaultFlowEdgeProps, topologySpec.getSpecExecutor());
-
- // In Multi-Graph if flowEdge existed, just skip it.
- if (!weightedGraph.containsEdge(flowEdge)) {
- weightedGraph.addEdge(sourceNode, targetNode, flowEdge);
- }
- }
- } catch (InterruptedException | ExecutionException e) {
- Instrumented.markMeter(this.flowCompilationFailedMeter);
- throw new RuntimeException("Cannot determine topology capabilities", e);
- }
- }
-
- /**
- * Generate JobSpec based on the #templateURI that user specified.
- */
- private JobSpec buildJobSpec (ServiceNode sourceNode, ServiceNode
targetNode, URI templateURI, FlowSpec flowSpec) {
- JobSpec jobSpec;
- JobSpec.Builder jobSpecBuilder =
JobSpec.builder(jobSpecURIGenerator(flowSpec, sourceNode, targetNode))
- .withConfig(flowSpec.getConfig())
- .withDescription(flowSpec.getDescription())
- .withVersion(flowSpec.getVersion());
- if (templateURI != null) {
- jobSpecBuilder.withTemplate(templateURI);
- try {
- jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(),
templateCatalog.get());
- log.info("Resolved JobSpec properties are: " +
jobSpec.getConfigAsProperties());
- } catch (SpecNotFoundException | JobTemplate.TemplateException e) {
- throw new RuntimeException("Could not resolve template in JobSpec from
TemplateCatalog", e);
- }
- } else {
- jobSpec = jobSpecBuilder.build();
- log.info("Unresolved JobSpec properties are: " +
jobSpec.getConfigAsProperties());
- }
-
- // Remove schedule
-
jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
-
- // Add job.name and job.group
- if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) {
- jobSpec.setConfig(jobSpec.getConfig()
- .withValue(ConfigurationKeys.JOB_NAME_KEY,
ConfigValueFactory.fromAnyRef(
-
flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY).unwrapped().toString()
- + "-" + sourceNode.getNodeName()
- + "-" + targetNode.getNodeName())));
- }
- if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) {
- jobSpec.setConfig(jobSpec.getConfig()
- .withValue(ConfigurationKeys.JOB_GROUP_KEY,
flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY)));
- }
-
- // Add flow execution id for this compilation
- long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
-
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
- ConfigValueFactory.fromAnyRef(flowExecutionId)));
-
- // Reset properties in Spec from Config
-
jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
- return jobSpec;
- }
-
- /**
- * A naive implementation of resolving templates in each JobSpec among
Multi-hop FlowSpec.
- * Handle the case when edge is not specified.
- * Always select the first available template.
- */
- private JobSpec convertHopToJobSpec (ServiceNode sourceNode, ServiceNode
targetNode, FlowSpec flowSpec) {
- FlowEdge flowEdge = weightedGraph.getAllEdges(sourceNode,
targetNode).iterator().next();
- URI templateURI = getTemplateURI (sourceNode, targetNode, flowSpec,
flowEdge);
- return buildJobSpec(sourceNode, targetNode, templateURI, flowSpec);
- }
-
- private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode,
FlowSpec flowSpec, FlowEdge flowEdge) {
- URI firstTemplateURI =
- (edgeTemplateMap != null &&
edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get(
- flowEdge.getEdgeIdentity()).get(0) :
jobSpecTemplateURIGenerator(flowSpec);
- return firstTemplateURI;
- }
-
- /**
- * A naive implementation of generating a jobSpec's URI within a multi-hop
logical Flow.
- */
- @Override
- public URI jobSpecURIGenerator(Object... objects) {
- FlowSpec flowSpec = (FlowSpec) objects[0];
- ServiceNode sourceNode = (ServiceNode) objects[1];
- ServiceNode targetNode = (ServiceNode) objects[2];
- try {
- return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME,
flowSpec.getUri().getAuthority(),
-
StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(),
"/"),"/")
- + sourceNode.getNodeName() + "-" + targetNode.getNodeName(),
null);
- } catch (URISyntaxException e) {
- log.error(
- "URI construction failed when jobSpec from " +
sourceNode.getNodeName() + " to " + targetNode.getNodeName());
- throw new RuntimeException();
- }
- }
-}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
deleted file mode 100644
index 540b13d..0000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.policy;
-
-import java.util.Set;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-
-
-/**
- * ServicePolicy will be firstly checked before the compilation happen.
- * unexpcted edges will not be considered in compilation process.
- */
-public interface ServicePolicy {
-
- /**
- * After initialization of {@link ServicePolicy}, the populating method need
to invoked before
- * {@link #getBlacklistedEdges()} can return the expected result.
- *
- * This requirement exists because when {@link ServicePolicy} is initialized
it is not necessary that a
- * {@link org.jgrapht.graph.WeightedMultigraph} has been constructed,
neither we cannot know if user-specified edges
- * and nodes exist in {@link org.jgrapht.graph.WeightedMultigraph}.
- * The population of blacklisted Edges make sense after graph has been
constructed.
- */
- public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode,
FlowEdge> graph);
-
- /**
- * Should return all edges that being blacklisted by this policy.
- */
- public Set<FlowEdge> getBlacklistedEdges();
-
- public void addServiceNode(ServiceNode serviceNode);
-
- public void addFlowEdge(FlowEdge flowEdge);
-}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
deleted file mode 100644
index aabc67e..0000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.policy;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * Defines {@link ServiceNode}s or {@link FlowEdge}s that should be
blacklisted from the Flow-compiler process,
- * obtained only from configuration, which is the reason it is named as static.
- *
- * TODO: DynamicServicePolicy can obtain new blacklist candidate through Flow
monitoring process, which is responsible
- * for monitoring the flow execution and react accordingly.
- *
- * Either user specify {@link ServiceNode} or {@link FlowEdge} to blacklist
will all end up with a list of
- * {@link FlowEdge}s that won't be considered when selecting path for data
transformation.
- */
-@Slf4j
-@Alias("static")
-public class StaticServicePolicy implements ServicePolicy {
-
- @Getter
- Set<FlowEdge> blacklistedEdges;
-
- List<ServiceNode> serviceNodes;
- List<FlowEdge> flowEdges;
-
- public StaticServicePolicy() {
- serviceNodes = new ArrayList<>();
- flowEdges = new ArrayList<>();
- blacklistedEdges = new HashSet<>();
- }
-
- public StaticServicePolicy(List<ServiceNode> serviceNodes, List<FlowEdge>
flowEdges) {
- Preconditions.checkNotNull(serviceNodes);
- Preconditions.checkNotNull(flowEdges);
- blacklistedEdges = new HashSet<>();
- this.serviceNodes = serviceNodes;
- this.flowEdges = flowEdges;
- }
-
- public void addServiceNode(ServiceNode serviceNode) {
- this.serviceNodes.add(serviceNode);
- }
-
- public void addFlowEdge(FlowEdge flowEdge){
- this.flowEdges.add(flowEdge);
- }
-
- @Override
- public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode,
FlowEdge> graph) {
- for (ServiceNode node: serviceNodes) {
- if (graph.containsVertex(node)) {
- blacklistedEdges.addAll(graph.incomingEdgesOf(node));
- blacklistedEdges.addAll(graph.outgoingEdgesOf(node));
- } else {
- log.info("The graph " + graph + " doesn't contains node " +
node.toString());
- }
- }
-
- for( FlowEdge flowEdge: flowEdges) {
- if (graph.containsEdge(flowEdge)) {
- blacklistedEdges.add(flowEdge);
- } else {
- log.info("The graph " + graph + "doesn't contains edge " +
flowEdge.toString());
- }
- }
- }
-}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
deleted file mode 100644
index 946fe10..0000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.utils;
-import org.apache.gobblin.runtime.api.ServiceNode;
-
-/**
- * This is a helping class(Basically a wrapper) for Shortest path finding
process
- * to keep the shortest distance from source to an arbitrary node.
- */
-public class DistancedNode<T extends ServiceNode> {
-
- /**
- * The distance between {@link this} node to the src node in the
shortest-distance finding problem.
- */
- private double distToSrc;
-
- private T _serviceNode;
-
-
- /**
- * Max_Value represents no-connection.
- */
- public DistancedNode(T _serviceNode){
- this(_serviceNode, Double.MAX_VALUE);
- }
-
- public DistancedNode(T _serviceNode, double dist){
- this._serviceNode = _serviceNode;
- this.distToSrc = dist;
- }
-
- public double getDistToSrc(){
- return this.distToSrc;
- }
-
- public void setDistToSrc(double distToSrc){
- this.distToSrc = distToSrc;
- }
-
- public T getNode(){
- return this._serviceNode;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- DistancedNode<?> that = (DistancedNode<?>) o;
-
- return _serviceNode.equals(that._serviceNode);
- }
-
- @Override
- public int hashCode() {
- return _serviceNode.hashCode();
- }
-}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
deleted file mode 100644
index 1481f78..0000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.utils;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-
-import lombok.extern.slf4j.Slf4j;
-
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
-
-@Slf4j
-public class FindPathUtils {
- // Since Author{[email protected]} couldn't find the proper way to conduct
Library provided by JGraphT
- // on the customized-edge Graph, here is the raw implementation of Dijkstra
algorithm for finding shortest path.
-
- /**
- * Given sourceNode and targetNode, find the shortest path and return
shortest path.
- * @return Each edge on this shortest path, in order.
- *
- */
- @VisibleForTesting
- public static List<FlowEdge> dijkstraBasedPathFindingHelper(ServiceNode
sourceNode, ServiceNode targetNode,
- DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph) {
- Map<DistancedNode, ArrayList<FlowEdge>> shortestPath = new HashMap<>();
- Map<DistancedNode, Double> shortestDist = new HashMap<>();
- PriorityQueue<DistancedNode> pq = new PriorityQueue<>(new
Comparator<DistancedNode>() {
- @Override
- public int compare(DistancedNode o1, DistancedNode o2) {
- if (o1.getDistToSrc() < o2.getDistToSrc()) {
- return -1;
- } else {
- return 1;
- }
- }
- });
- pq.add(new DistancedNode(sourceNode, 0.0));
-
- Set<FlowEdge> visitedEdge = new HashSet<>();
-
- while(!pq.isEmpty()) {
- DistancedNode node = pq.poll();
- if (node.getNode().getNodeName().equals(targetNode.getNodeName())) {
- // Searching finished
- return shortestPath.get(node);
- }
-
- Set<FlowEdge> outgoingEdges =
weightedGraph.outgoingEdgesOf(node.getNode());
- for (FlowEdge outGoingEdge:outgoingEdges) {
- // Since it is a multi-graph problem, should use edge for
deduplicaiton instead of vertex.
- if (visitedEdge.contains(outGoingEdge)) {
- continue;
- }
-
- DistancedNode adjacentNode = new
DistancedNode(weightedGraph.getEdgeTarget(outGoingEdge));
- if (shortestDist.containsKey(adjacentNode)) {
- adjacentNode.setDistToSrc(shortestDist.get(adjacentNode));
- }
-
- double newDist = node.getDistToSrc() + ((LoadBasedFlowEdgeImpl)
outGoingEdge).getEdgeLoad();
-
- if (newDist < adjacentNode.getDistToSrc()) {
- if (pq.contains(adjacentNode)) {
- pq.remove(adjacentNode);
- }
-
- // Update the shortest path.
- ArrayList<FlowEdge> path = shortestPath.containsKey(node)
- ? new ArrayList<>(shortestPath.get(node)) : new ArrayList<>();
- path.add(outGoingEdge);
- shortestPath.put(adjacentNode, path);
- shortestDist.put(adjacentNode, newDist);
-
- adjacentNode.setDistToSrc(newDist);
- pq.add(adjacentNode);
- }
- visitedEdge.add(outGoingEdge);
- }
- }
- log.error("No path found");
- return new ArrayList<>();
- }
-}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
deleted file mode 100644
index 8b08628..0000000
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.service.modules.core;
-
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import com.typesafe.config.Config;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.Path;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-import org.jgrapht.graph.WeightedMultigraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl;
-import org.apache.gobblin.service.modules.flow.MultiHopsFlowToJobSpecCompiler;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
-
-import static org.apache.gobblin.service.modules.utils.FindPathUtils.*;
-
-
-// All unit tests here will be with templateCatelogue.
-public class MultiHopsFlowToJobSpecCompilerTest {
- private static final Logger logger =
LoggerFactory.getLogger(MultiHopsFlowToJobSpecCompilerTest.class);
-
- private static final String TEST_TEMPLATE_CATALOG_PATH =
"/tmp/gobblinTestTemplateCatalog_" + System.currentTimeMillis();
- private static final String TEST_TEMPLATE_CATALOG_URI = "file://" +
TEST_TEMPLATE_CATALOG_PATH;
-
- private static final String TEST_TEMPLATE_NAME = "test.template";
- private static final String TEST_TEMPLATE_URI = "FS:///test.template";
-
- // The path to be discovered is TEST_SOURCE_NAME -> TEST_HOP_NAME_A ->
TEST_HOP_NAME_B -> TEST_SINK_NAME
- private static final String TEST_SOURCE_NAME = "testSource";
- private static final String TEST_HOP_NAME_A = "testHopA";
- private static final String TEST_HOP_NAME_B = "testHopB";
- private static final String TEST_HOP_NAME_C = "testHopC";
- private static final String TEST_SINK_NAME = "testSink";
- private static final String TEST_FLOW_GROUP = "testFlowGroup";
- private static final String TEST_FLOW_NAME = "testFlowName";
-
- private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
- private static final String SPEC_DESCRIPTION = "Test Orchestrator";
- private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
- private static final String TOPOLOGY_SPEC_STORE_DIR =
"/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
- private static final String TOPOLOGY_SPEC_STORE_DIR_SECOND =
"/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis() + "_2";
- private static final String FLOW_SPEC_STORE_DIR =
"/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();
-
- private ServiceNode vertexSource;
- private ServiceNode vertexHopA;
- private ServiceNode vertexHopB;
- private ServiceNode vertexHopC;
- private ServiceNode vertexSink;
-
- private MultiHopsFlowToJobSpecCompiler compilerWithTemplateCalague;
- private Map<String, List<URI>> edgeTemplateMap;
-
-
- @BeforeClass
- public void setUp() throws Exception{
- // Create dir for template catalog
- FileUtils.forceMkdir(new File(TEST_TEMPLATE_CATALOG_PATH));
-
- // Create template to use in test
- List<String> templateEntries = new ArrayList<>();
- templateEntries.add("testProperty1 = \"testValue1\"");
- templateEntries.add("testProperty2 = \"test.Value1\"");
- templateEntries.add("testProperty3 = 100");
- FileUtils.writeLines(new File(TEST_TEMPLATE_CATALOG_PATH + "/" +
TEST_TEMPLATE_NAME), templateEntries);
-
- // Initialize complier with template catalog
- Properties compilerWithTemplateCatalogProperties = new Properties();
-
compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
TEST_TEMPLATE_CATALOG_URI);
-
- // Initialize compiler with common useful properties
- String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," +
TEST_HOP_NAME_B + "," + TEST_SINK_NAME;
-
compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH,
testPath);
-
- this.compilerWithTemplateCalague = new
MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties));
-
- vertexSource = new BaseServiceNodeImpl(TEST_SOURCE_NAME);
- vertexHopA = new BaseServiceNodeImpl(TEST_HOP_NAME_A);
- vertexHopB = new BaseServiceNodeImpl(TEST_HOP_NAME_B);
- vertexHopC = new BaseServiceNodeImpl(TEST_HOP_NAME_C);
- vertexSink = new BaseServiceNodeImpl(TEST_SINK_NAME);
-
- }
-
- @AfterClass
- public void cleanUp() throws Exception {
- // Cleanup Template Catalog
- try {
- cleanUpDir(TEST_TEMPLATE_CATALOG_PATH);
- } catch (Exception e) {
- logger.warn("Could not completely cleanup Template catalog dir");
- }
-
- // Cleanup ToplogySpec Dir
- try {
- cleanUpDir(TOPOLOGY_SPEC_STORE_DIR);
- } catch (Exception e) {
- logger.warn("Could not completely cleanup ToplogySpec catalog dir");
- }
-
- // Cleanup FlowSpec Dir
- try {
- cleanUpDir(FLOW_SPEC_STORE_DIR);
- } catch (Exception e) {
- logger.warn("Could not completely cleanup FlowSpec catalog dir");
- }
- }
-
- @Test
- public void testWeightedGraphConstruction(){
- FlowSpec flowSpec = initFlowSpec();
- TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR,
TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
- this.compilerWithTemplateCalague.onAddSpec(topologySpec);
-
- // invocation of compileFlow trigger the weighedGraph construction
- this.compilerWithTemplateCalague.compileFlow(flowSpec);
- DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph =
compilerWithTemplateCalague.getWeightedGraph();
-
- Assert.assertTrue(weightedGraph.containsVertex(vertexSource));
- Assert.assertTrue(weightedGraph.containsVertex(vertexHopA));
- Assert.assertTrue(weightedGraph.containsVertex(vertexHopB));
- Assert.assertTrue(weightedGraph.containsVertex(vertexSink));
-
- FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA,
topologySpec.getSpecExecutor());
- FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB,
topologySpec.getSpecExecutor());
- FlowEdge edgeB2Sink = new LoadBasedFlowEdgeImpl(vertexHopB, vertexSink,
topologySpec.getSpecExecutor());
-
- Assert.assertTrue(weightedGraph.containsEdge(edgeSrc2A));
- Assert.assertTrue(weightedGraph.containsEdge(edgeA2B));
- Assert.assertTrue(weightedGraph.containsEdge(edgeB2Sink));
-
- Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexSource,
vertexHopA), edgeSrc2A));
- Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopA, vertexHopB),
edgeA2B));
- Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopB, vertexSink),
edgeB2Sink));
-
- this.compilerWithTemplateCalague.onDeleteSpec(topologySpec.getUri(), "");
- }
-
- @Test
- public void testUserSpecifiedPathCompilation(){
- // TODO
- }
-
- @Test
- public void testServicePolicy(){
- // Initialize compiler with some blacklist properties
- Properties properties = new Properties();
-
properties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
TEST_TEMPLATE_CATALOG_URI);
- String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," +
TEST_HOP_NAME_B + "," + TEST_SINK_NAME;
- properties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH,
testPath);
- properties.setProperty(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES,
- "testHopA");
- MultiHopsFlowToJobSpecCompiler compiler = new
MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(properties));
-
-
- FlowSpec flowSpec = initFlowSpec();
- TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR,
TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
- compiler.onAddSpec(topologySpec);
-
- // invocation of compileFlow trigger the weighedGraph construction
- compiler.compileFlow(flowSpec);
-
-
compiler.servicePolicy.populateBlackListedEdges(compiler.getWeightedGraph());
- Assert.assertEquals(compiler.servicePolicy.getBlacklistedEdges().size(),
2);
-
- FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA,
topologySpec.getSpecExecutor());
- FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB,
topologySpec.getSpecExecutor());
-
-
Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeSrc2A));
-
Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeA2B));
-
- }
-
- @Test (dependsOnMethods = "testWeightedGraphConstruction")
- public void testDijkstraPathFinding(){
-
- FlowSpec flowSpec = initFlowSpec();
- TopologySpec topologySpec_1 = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR,
TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
- TopologySpec topologySpec_2 =
initTopologySpec(TOPOLOGY_SPEC_STORE_DIR_SECOND, TEST_SOURCE_NAME,
TEST_HOP_NAME_B, TEST_HOP_NAME_C, TEST_SINK_NAME);
- this.compilerWithTemplateCalague.onAddSpec(topologySpec_1);
- this.compilerWithTemplateCalague.onAddSpec(topologySpec_2);
-
- // Get the edge -> Change the weight -> Materialized the edge change back
to graph -> compile again -> Assertion
- this.compilerWithTemplateCalague.compileFlow(flowSpec);
- DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph =
compilerWithTemplateCalague.getWeightedGraph();
- FlowEdge a2b= weightedGraph.getEdge(vertexHopA, vertexHopB);
- FlowEdge b2c = weightedGraph.getEdge(vertexHopB, vertexHopC);
- FlowEdge c2s = weightedGraph.getEdge(vertexHopC, vertexSink);
- weightedGraph.setEdgeWeight(a2b, 1.99);
- weightedGraph.setEdgeWeight(b2c, 0.1);
- weightedGraph.setEdgeWeight(c2s, 0.2);
-
- // Best route: Src - B(1) - C(0.1) - sink (0.2)
- this.compilerWithTemplateCalague.compileFlow(flowSpec);
- List<FlowEdge> edgeList = dijkstraBasedPathFindingHelper(vertexSource,
vertexSink, weightedGraph);
-
- FlowEdge src2b = weightedGraph.getEdge(vertexSource, vertexHopB);
- FlowEdge b2C = weightedGraph.getEdge(vertexHopB, vertexHopC);
- FlowEdge c2sink = weightedGraph.getEdge(vertexHopC, vertexSink);
- Assert.assertEquals(edgeList.get(0).getEdgeIdentity(),
src2b.getEdgeIdentity());
- Assert.assertEquals(edgeList.get(1).getEdgeIdentity(),
b2C.getEdgeIdentity());
- Assert.assertEquals(edgeList.get(2).getEdgeIdentity(),
c2sink.getEdgeIdentity());
-
- this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_1.getUri(), "");
- this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_2.getUri(), "");
- }
-
- // The topology is: Src - A - B - Dest
- private TopologySpec initTopologySpec(String storeDir, String ... args) {
- Properties properties = new Properties();
- properties.put("specStore.fs.dir", storeDir);
- String capabilitiesString = "";
- for(int i =0 ; i < args.length - 1 ; i ++ ) {
- capabilitiesString = capabilitiesString + ( args[i] + ":" + args[i+1] +
",");
- }
- Assert.assertEquals(capabilitiesString.charAt(capabilitiesString.length()
- 1) , ',');
- capabilitiesString = capabilitiesString.substring(0,
capabilitiesString.length() - 1 );
- properties.put("specExecInstance.capabilities", capabilitiesString);
- properties.put("executorAttrs", new Properties());
- Config config = ConfigUtils.propertiesToConfig(properties);
- SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config);
-
- TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(
-
IdentityFlowToJobSpecCompilerTest.computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
- storeDir))
- .withConfig(config)
- .withDescription(SPEC_DESCRIPTION)
- .withVersion(SPEC_VERSION)
- .withSpecExecutor(specExecutorInstance);
- return topologySpecBuilder.build();
- }
-
- private FlowSpec initFlowSpec() {
- return initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, TEST_SOURCE_NAME,
TEST_SINK_NAME);
- }
-
- private FlowSpec initFlowSpec(String flowGroup, String flowName, String
source, String destination) {
- Properties properties = new Properties();
- properties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
- properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
- properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
- properties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source);
- properties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
destination);
- Config config = ConfigUtils.propertiesToConfig(properties);
-
- FlowSpec.Builder flowSpecBuilder = null;
- try {
- flowSpecBuilder =
FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
- FLOW_SPEC_STORE_DIR))
- .withConfig(config)
- .withDescription("dummy description")
- .withVersion(SPEC_VERSION)
- .withTemplate(new URI(TEST_TEMPLATE_URI));
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- return flowSpecBuilder.build();
- }
-
- private void cleanUpDir(String dir) throws Exception {
- File specStoreDir = new File(dir);
- if (specStoreDir.exists()) {
- FileUtils.deleteDirectory(specStoreDir);
- }
- }
-
- // Criteria for FlowEdge to be equal in testing context
- private boolean edgeEqual(FlowEdge a, FlowEdge b){
- return (a.getEdgeIdentity().equals(b.getEdgeIdentity()) &&
- ((LoadBasedFlowEdgeImpl)a).getEdgeLoad() ==
((LoadBasedFlowEdgeImpl)b).getEdgeLoad());
- }
-
- // Use this function for
- private void populateTemplateMap(WeightedMultigraph<ServiceNode, FlowEdge>
weightedGraph, URI exempliedURI){
- this.edgeTemplateMap.clear();
- Set<FlowEdge> allEdges = weightedGraph.edgeSet();
- for ( FlowEdge edge : allEdges ) {
- this.edgeTemplateMap.put(edge.getEdgeIdentity(),
Arrays.asList(exempliedURI)) ;
- }
- }
-
- public static URI computeTopologySpecURI(String parent, String current) {
- // Make sure this is relative
- return PathUtils.relativizePath(new Path(current), new
Path(parent)).toUri();
- }
-
-}
\ No newline at end of file