This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 76d692e  [GOBBLIN-754] Clean old version of multi-hop compiler
76d692e is described below

commit 76d692e253ef8a5b3d5524cf566398cf62d42c29
Author: autumnust <[email protected]>
AuthorDate: Wed Jun 5 16:04:17 2019 -0700

    [GOBBLIN-754] Clean old version of multi-hop compiler
    
    Closes #2660 from autumnust/craftsmanshipCleaning
---
 .../modules/flow/BaseFlowToJobSpecCompiler.java    |  26 --
 .../service/modules/flow/FlowEdgeProps.java        |  67 ----
 .../flow/IdentityFlowToJobSpecCompiler.java        |   6 -
 .../modules/flow/LoadBasedFlowEdgeImpl.java        | 180 -----------
 .../service/modules/flow/MultiHopFlowCompiler.java |   5 -
 .../flow/MultiHopsFlowToJobSpecCompiler.java       | 358 ---------------------
 .../service/modules/policy/ServicePolicy.java      |  51 ---
 .../modules/policy/StaticServicePolicy.java        |  98 ------
 .../service/modules/utils/DistancedNode.java       |  77 -----
 .../service/modules/utils/FindPathUtils.java       | 109 -------
 .../core/MultiHopsFlowToJobSpecCompilerTest.java   | 326 -------------------
 11 files changed, 1303 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 8374211..257ad3d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -49,7 +49,6 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
@@ -71,20 +70,6 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
   @Setter
   protected final Map<URI, TopologySpec> topologySpecMap;
 
-
-  /**
-   * Mapping between each FlowEdge and a list of applicable Templates.
-   * Compiler should obtain this Map info from higher level component.
-   * since {@link TopologySpec} doesn't contain Templates.
-   * Key: EdgeIdentifier from {@link 
org.apache.gobblin.runtime.api.FlowEdge#getEdgeIdentity()}
-   * Value: List of template URI.
-   */
-  // TODO: Define how template info are instantiated. ETL-6217
-  @Getter
-  @Setter
-  protected final Map<String, List<URI>> edgeTemplateMap;
-
-
   protected final Config config;
   protected final Logger log;
   protected final Optional<FSJobCatalog> templateCatalog;
@@ -128,7 +113,6 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
     }
 
     this.topologySpecMap = Maps.newConcurrentMap();
-    this.edgeTemplateMap = Maps.newConcurrentMap();
     this.config = config;
 
     /***
@@ -295,14 +279,4 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
     // For now only first template uri will be honored for Identity
     return flowSpec.getTemplateURIs().get().iterator().next();
   }
-
-  /**
-   * Ideally each edge has its own eligible template repository(Based on 
{@link SpecExecutor})
-   * to pick templates from.
-   *
-   * This function is to transform from all mixed templates ({@link 
#templateCatalog})
-   * into categorized {@link #edgeTemplateMap}.
-   *
-   */
-  abstract protected void populateEdgeTemplateMap();
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
deleted file mode 100644
index 751bb09..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigObject;
-import java.io.Serializable;
-import java.util.Properties;
-
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
-
-import lombok.Data;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class FlowEdgeProps {
-  protected static final boolean DEFAULT_EDGE_SAFETY = true;
-
-  /**
-   * Contains read-only properties that users want to package in.
-   */
-  @Getter
-  protected Config config;
-
-  /**
-   * One of the mutable properties of an edge.
-   */
-  @Getter
-  @Setter
-  private boolean isEdgeSecure;
-
-  public FlowEdgeProps(Config config) {
-    this.config = config;
-    isEdgeSecure = getInitialEdgeSafety();
-  }
-
-  public FlowEdgeProps() {
-    this(ConfigFactory.empty());
-  }
-
-  /**
-   * When initializing an edge, load and security value from properties will 
be used
-   * but could be overriden afterwards.
-   */
-  private boolean getInitialEdgeSafety() {
-    return
-        config.hasPath(EDGE_SECURITY_KEY) ? 
config.getBoolean(EDGE_SECURITY_KEY) : DEFAULT_EDGE_SAFETY;
-  }
-}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
index 42c4926..d3b25f5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
@@ -117,10 +117,4 @@ public class IdentityFlowToJobSpecCompiler extends 
BaseFlowToJobSpecCompiler {
     }
     return jobExecutionPlans;
   }
-
-  @Override
-  protected void populateEdgeTemplateMap() {
-    log.warn("No population of templates based on edge happen in this 
implementation");
-    return;
-  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
deleted file mode 100644
index 83b94e3..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import com.typesafe.config.Config;
-
-import org.jgrapht.graph.DefaultWeightedEdge;
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-
-import lombok.Getter;
-
-/**
- * A base implementation of a flowEdge in the weight multi-edge graph.
- * For a weightedMultiGraph there could be multiple edges between two vertices.
- * Recall that a triplet of <SourceNode, targetNode, specExecutor> determines 
one edge.
- * It is expected that {@link 
org.jgrapht.graph.DirectedWeightedMultigraph#getAllEdges(Object, Object)}
- * can return multiple edges with the same pair of source and destination but 
different SpecExecutor.
- *
- * Each edge has a {@FlowEdgeProp} which contains mutable and immutable 
properties.
- * The {@link LoadBasedFlowEdgeImpl} exposes two mutable properties: Load and 
Security.
- *
- * Load of an edge is equivalent to weight defined in {@link 
DefaultWeightedEdge}.
- * Since {@link #getWeight()} method is protected, {@link #getEdgeLoad()} will 
return the load.
- * There's no setLoad, which is logically supposed to happen by invoking
- * {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, 
double)}.
- *
- * Security of an edge describes if an edge is secure to be part of data 
movement path at current stage.
- *
- */
-@Alpha
-public class LoadBasedFlowEdgeImpl extends DefaultWeightedEdge implements 
FlowEdge {
-
-  /**
-   * In our cases {@link LoadBasedFlowEdgeImpl} is not likely to be serialized.
-   * While as it extends {@link DefaultWeightedEdge} for best practice we made 
all fields transient,
-   * and specify serialVersionUID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  @Getter
-  private transient ServiceNode sourceNode;
-  @Getter
-  private transient ServiceNode targetNode;
-  @Getter
-  private transient SpecExecutor specExecutorInstance;
-
-  /**
-   * Contains both read-only and mutable attributes of properties of an edge.
-   * Mutable properties in{@link FlowEdgeProps} expose their Setter & Getter
-   * thru. either the {@link FlowEdgeProps}
-   * or graph-level api, e.g. {@link 
org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}
-   *
-   * Typical mutable properties of an edge includes:
-   * Load(Weight), Security.
-   */
-  private final transient FlowEdgeProps flowEdgeProps;
-
-  public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode,
-      FlowEdgeProps flowEdgeProps, SpecExecutor specExecutorInstance) {
-    this.sourceNode = sourceNode;
-    this.targetNode = targetNode;
-    this.flowEdgeProps = flowEdgeProps;
-    this.specExecutorInstance = specExecutorInstance;
-  }
-
-  public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode,
-      SpecExecutor specExecutor) {
-    this(sourceNode, targetNode, new FlowEdgeProps(specExecutor.getAttrs()),
-        specExecutor);
-  }
-
-  // Load: Directly using {@link DefaultWeightedEdge}'s weight field.
-  /**
-   * Load:
-   * Initialization: super's default constructor
-   * Getter: {@link #getEdgeLoad()}} thru. {@link DefaultWeightedEdge}'s 
{@link #getWeight()}.
-   * Setter:Thru. {@link 
org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}
-   */
-  public double getEdgeLoad() {
-    return getWeight();
-  }
-
-  // Security: Get/Set thru. FlowEdgeProps
-  /**
-   * Initialization\Getter\Setter: By {@link FlowEdgeProps}
-   */
-  public boolean getIsEdgeSecure() {
-    return flowEdgeProps.isEdgeSecure();
-  }
-  public void setIsEdgeSecure(boolean isEdgeSecure) {
-    this.flowEdgeProps.setEdgeSecure(isEdgeSecure);
-  }
-
-
-  @Override
-  public String getEdgeIdentity() {
-    return this.calculateEdgeIdentity(this.sourceNode, this.targetNode, 
this.specExecutorInstance);
-  }
-
-  @Override
-  public Config getEdgeProperties() {
-    return this.flowEdgeProps.getConfig();
-  }
-
-  @Override
-  /**
-   * Naive rule: If edge is secure, then it is qualified to be considered in 
path-finding.
-   */
-  public boolean isEdgeEnabled() {
-    return this.flowEdgeProps.isEdgeSecure();
-  }
-
-
-  /**
-   * A naive implementation of edge identity calculation.
-   * @return
-   */
-  public static String calculateEdgeIdentity(ServiceNode sourceNode, 
ServiceNode targetNode, SpecExecutor specExecutorInstance){
-    return sourceNode.getNodeName() + "-" + specExecutorInstance.getUri() + 
"-" + targetNode.getNodeName();
-  }
-
-  /**
-   * Recall that we need a triplet to uniquely define a {@link FlowEdge}:
-   * - {@link ServiceNode} sourceNode
-   * - {@link ServiceNode} targetNode
-   * - {@link SpecExecutor} SpecExecutor
-   *
-   * We DO NOT distinguish between two edges by other props like weight,
-   * as the load should be an attribute of an edge.
-   * These are IntelliJ-generated methods for equals and hashCode().
-   *
-   * @param o The object that being compared
-   * @return If two {@link LoadBasedFlowEdgeImpl} are equivalent.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    LoadBasedFlowEdgeImpl that = (LoadBasedFlowEdgeImpl) o;
-
-    if (!sourceNode.equals(that.sourceNode)) {
-      return false;
-    }
-    if (!targetNode.equals(that.targetNode)) {
-      return false;
-    }
-    return specExecutorInstance.equals(that.specExecutorInstance);
-  }
-
-  @Override
-  public int hashCode() {
-    int result = sourceNode.hashCode();
-    result = 31 * result + targetNode.hashCode();
-    result = 31 * result + specExecutorInstance.hashCode();
-    return result;
-  }
-}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index f067344..d9a3812 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -197,11 +197,6 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     return jobExecutionPlanDag;
   }
 
-  @Override
-  protected void populateEdgeTemplateMap() {
-    log.warn("No population of templates based on edge happen in this 
implementation");
-  }
-
   /**
    * Register a shutdown hook for this thread.
    */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
deleted file mode 100644
index 6b0b14a..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.flow;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-import org.slf4j.Logger;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.JobSpec;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
-import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.policy.ServicePolicy;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
-import org.apache.gobblin.util.ClassAliasResolver;
-import org.apache.gobblin.util.ConfigUtils;
-
-import static org.apache.gobblin.service.ServiceConfigKeys.SERVICE_POLICY_NAME;
-import static 
org.apache.gobblin.service.modules.utils.FindPathUtils.dijkstraBasedPathFindingHelper;
-
-// Users are capable to inject hints/prioritization into route selection, in 
two forms:
-// 1. PolicyBasedBlockedConnection: Define some undesired routes
-// 2. Specified a complete path. FlowCompiler is responsible to verify if the 
path given is valid.
-
-// TODO: Flow monitoring, injecting weight for flowEdge:ETL-6213
-@Slf4j
-public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {
-
-  private static final Splitter SPLIT_BY_COMMA = 
Splitter.on(",").omitEmptyStrings().trimResults();
-
-  @Getter
-  private DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph =
-      new DirectedWeightedMultigraph<>(LoadBasedFlowEdgeImpl.class);
-
-  public ServicePolicy servicePolicy;
-
-  // Contains user-specified complete path of how the data movement is 
executed from source to sink.
-  private Optional<String> optionalUserSpecifiedPath;
-
-  private FlowEdgeProps defaultFlowEdgeProps = new FlowEdgeProps();
-
-  public MultiHopsFlowToJobSpecCompiler(Config config) {
-    this(config, Optional.absent(), true);
-  }
-
-  public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log) {
-    this(config, log, true);
-  }
-
-  public MultiHopsFlowToJobSpecCompiler(Config config, Optional<Logger> log, 
boolean instrumentationEnabled) {
-    super(config, log, instrumentationEnabled);
-    String policyClassName = config.hasPath(SERVICE_POLICY_NAME) ? 
config.getString(SERVICE_POLICY_NAME)
-        : ServiceConfigKeys.DEFAULT_SERVICE_POLICY;
-    ClassAliasResolver<ServicePolicy> classResolver = new 
ClassAliasResolver<>(ServicePolicy.class);
-    try {
-      servicePolicy = 
classResolver.resolveClass(policyClassName).newInstance();
-    } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
-      throw new RuntimeException("Error happen when resolving class for :" + 
policyClassName, e);
-    }
-
-    if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION)
-        && 
config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION).size() 
> 0) {
-      try {
-        for (String sourceSinkPair : 
config.getStringList(ServiceConfigKeys.POLICY_BASED_BLOCKED_CONNECTION)) {
-          BaseServiceNodeImpl source = new 
BaseServiceNodeImpl(sourceSinkPair.split(":")[0]);
-          BaseServiceNodeImpl sink = new 
BaseServiceNodeImpl(sourceSinkPair.split(":")[1]);
-          URI specExecutorURI = new URI(sourceSinkPair.split(":")[2]);
-          servicePolicy.addFlowEdge(
-              new LoadBasedFlowEdgeImpl(source, sink, 
InMemorySpecExecutor.createDummySpecExecutor(specExecutorURI)));
-        }
-      } catch (URISyntaxException e) {
-        this.log.warn("Constructing of FlowEdge in ServicePolicy Failed");
-      }
-    }
-
-    if (config.hasPath(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES) &&
-        
StringUtils.isNotBlank(config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES)))
 {
-      for (String blacklistedNode : SPLIT_BY_COMMA.splitToList(
-          config.getString(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES))) {
-        servicePolicy.addServiceNode(new BaseServiceNodeImpl(blacklistedNode));
-      }
-    }
-
-    if (config.hasPath(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH) && 
StringUtils.isNotBlank(
-        config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH))) {
-      optionalUserSpecifiedPath = 
Optional.of(config.getString(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH));
-    } else {
-      optionalUserSpecifiedPath = Optional.absent();
-    }
-  }
-
-  @Override
-  public Dag<JobExecutionPlan> compileFlow(Spec spec) {
-    // A Map from JobSpec to SpexExecutor, as the output of Flow Compiler.
-    Map<Spec, SpecExecutor> specExecutorInstanceMap = Maps.newLinkedHashMap();
-    findPath(specExecutorInstanceMap, spec);
-    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
-    for (Map.Entry<Spec, SpecExecutor> entry: 
specExecutorInstanceMap.entrySet()) {
-      JobExecutionPlan jobExecutionPlan = new JobExecutionPlan((JobSpec) 
entry.getKey(), entry.getValue());
-      jobExecutionPlans.add(jobExecutionPlan);
-    }
-    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
-  }
-
-  /**
-   * @return Transform a set of {@link TopologySpec} into a instance of {@link 
org.jgrapht.graph.WeightedMultigraph}
-   * and filter out connections between blacklisted vertices that user 
specified.
-   * The output of this function only stays in memory, so each time a logical 
flow is compiled, the multigraph will
-   * be re-calculated.
-   *
-   */
-  private void inMemoryWeightGraphGenerator() {
-    for (TopologySpec topologySpec : topologySpecMap.values()) {
-      weightGraphGenerateHelper(topologySpec);
-    }
-
-    // Filter out connection appearing in servicePolicy.
-    // This is where servicePolicy is enforced.
-    servicePolicy.populateBlackListedEdges(this.weightedGraph);
-    if (servicePolicy.getBlacklistedEdges().size() > 0) {
-      for (FlowEdge toDeletedEdge : servicePolicy.getBlacklistedEdges()) {
-        weightedGraph.removeEdge(toDeletedEdge);
-      }
-    }
-  }
-
-  // Basically a dijkstra path finding for connecting source and sink by 
multiple hops in between.
-  // If there's any user-specified prioritization, conduct the DFS and see if 
the user-specified path is available.
-
-  // there's no updates on TopologySpec, or user should be aware of the 
possibility
-  // that a topologySpec not being reflected in findPath.
-  private void findPath(Map<Spec, SpecExecutor> specExecutorInstanceMap, Spec 
spec) {
-    inMemoryWeightGraphGenerator();
-    FlowSpec flowSpec = (FlowSpec) spec;
-    if (optionalUserSpecifiedPath.isPresent()) {
-      log.info("Starting to evaluate user's specified path ... ");
-      if (userSpecifiedPathVerificator(specExecutorInstanceMap, flowSpec)) {
-        log.info("User specified path[ " + optionalUserSpecifiedPath.get() + 
"] successfully verified.");
-        return;
-      } else {
-        log.error("Will not execute user specified path[ " + 
optionalUserSpecifiedPath.get() + "]");
-        log.info("Start to execute FlowCompiler's algorithm for valid data 
movement path");
-      }
-    }
-
-    ServiceNode sourceNode =
-        new 
BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY));
-
-    ServiceNode targetNode =
-        new 
BaseServiceNodeImpl(flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY));
-
-    List<FlowEdge> resultEdgePath = dijkstraBasedPathFindingHelper(sourceNode, 
targetNode, this.weightedGraph);
-    for (int i = 0; i < resultEdgePath.size() ; i++) {
-      FlowEdge tmpFlowEdge = resultEdgePath.get(i);
-      ServiceNode edgeSrcNode = ((LoadBasedFlowEdgeImpl) 
tmpFlowEdge).getSourceNode();
-      ServiceNode edgeTgtNode = ((LoadBasedFlowEdgeImpl) 
tmpFlowEdge).getTargetNode();
-      specExecutorInstanceMap.put(convertHopToJobSpec(edgeSrcNode, 
edgeTgtNode, flowSpec),
-          ((LoadBasedFlowEdgeImpl) 
(resultEdgePath.get(i))).getSpecExecutorInstance());
-    }
-  }
-
-  /**
-   * As the base implementation here, all templates will be considered for 
each edge.
-   */
-  @Override
-  protected void populateEdgeTemplateMap() {
-    if (templateCatalog.isPresent()) {
-      for (FlowEdge flowEdge : this.weightedGraph.edgeSet()) {
-        edgeTemplateMap.put(flowEdge.getEdgeIdentity(), templateCatalog.get().
-            getAllTemplates().
-            stream().map(jobTemplate -> 
jobTemplate.getUri()).collect(Collectors.toList()));
-      }
-    }
-  }
-
-  // If path specified not existed, return false;
-  // else return true.
-  private boolean userSpecifiedPathVerificator(Map<Spec, SpecExecutor> 
specExecutorInstanceMap, FlowSpec flowSpec) {
-    Map<Spec, SpecExecutor> tmpSpecExecutorInstanceMap = new HashMap<>();
-    List<String> userSpecfiedPath = 
Arrays.asList(optionalUserSpecifiedPath.get().split(","));
-    for (int i = 0; i < userSpecfiedPath.size() - 1; i++) {
-      ServiceNode sourceNode = new 
BaseServiceNodeImpl(userSpecfiedPath.get(i));
-      ServiceNode targetNode = new BaseServiceNodeImpl(userSpecfiedPath.get(i 
+ 1));
-      if (weightedGraph.containsVertex(sourceNode) && 
weightedGraph.containsVertex(targetNode)
-          && weightedGraph.containsEdge(sourceNode, targetNode)) {
-        tmpSpecExecutorInstanceMap.put(convertHopToJobSpec(sourceNode, 
targetNode, flowSpec),
-            (((LoadBasedFlowEdgeImpl) weightedGraph.getEdge(sourceNode, 
targetNode)).getSpecExecutorInstance()));
-      } else {
-        log.error("User Specified Path is invalid");
-        return false;
-      }
-    }
-    specExecutorInstanceMap.putAll(tmpSpecExecutorInstanceMap);
-    return true;
-  }
-
-  // Helper function for transform TopologySpecMap into a 
weightedDirectedGraph.
-  private void weightGraphGenerateHelper(TopologySpec topologySpec) {
-    try {
-      Map<ServiceNode, ServiceNode> capabilities = 
topologySpec.getSpecExecutor().getCapabilities().get();
-      for (Map.Entry<ServiceNode, ServiceNode> capability : 
capabilities.entrySet()) {
-
-        BaseServiceNodeImpl sourceNode = new 
BaseServiceNodeImpl(capability.getKey().getNodeName());
-        BaseServiceNodeImpl targetNode = new 
BaseServiceNodeImpl(capability.getValue().getNodeName());
-
-        if (!weightedGraph.containsVertex(sourceNode)) {
-          weightedGraph.addVertex(sourceNode);
-        }
-        if (!weightedGraph.containsVertex(targetNode)) {
-          weightedGraph.addVertex(targetNode);
-        }
-
-        FlowEdge flowEdge =
-            new LoadBasedFlowEdgeImpl(sourceNode, targetNode, 
defaultFlowEdgeProps, topologySpec.getSpecExecutor());
-
-        // In Multi-Graph if flowEdge existed, just skip it.
-        if (!weightedGraph.containsEdge(flowEdge)) {
-          weightedGraph.addEdge(sourceNode, targetNode, flowEdge);
-        }
-      }
-    } catch (InterruptedException | ExecutionException e) {
-      Instrumented.markMeter(this.flowCompilationFailedMeter);
-      throw new RuntimeException("Cannot determine topology capabilities", e);
-    }
-  }
-
-  /**
-   * Generate JobSpec based on the #templateURI that user specified.
-   */
-  private JobSpec buildJobSpec (ServiceNode sourceNode, ServiceNode 
targetNode, URI templateURI, FlowSpec flowSpec) {
-    JobSpec jobSpec;
-    JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowSpec, sourceNode, targetNode))
-        .withConfig(flowSpec.getConfig())
-        .withDescription(flowSpec.getDescription())
-        .withVersion(flowSpec.getVersion());
-    if (templateURI != null) {
-      jobSpecBuilder.withTemplate(templateURI);
-      try {
-        jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), 
templateCatalog.get());
-        log.info("Resolved JobSpec properties are: " + 
jobSpec.getConfigAsProperties());
-      } catch (SpecNotFoundException | JobTemplate.TemplateException e) {
-        throw new RuntimeException("Could not resolve template in JobSpec from 
TemplateCatalog", e);
-      }
-    } else {
-      jobSpec = jobSpecBuilder.build();
-      log.info("Unresolved JobSpec properties are: " + 
jobSpec.getConfigAsProperties());
-    }
-
-    // Remove schedule
-    
jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
-
-    // Add job.name and job.group
-    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) {
-      jobSpec.setConfig(jobSpec.getConfig()
-          .withValue(ConfigurationKeys.JOB_NAME_KEY,  
ConfigValueFactory.fromAnyRef(
-              
flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY).unwrapped().toString()
-                  + "-" + sourceNode.getNodeName()
-                  + "-" + targetNode.getNodeName())));
-    }
-    if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) {
-      jobSpec.setConfig(jobSpec.getConfig()
-          .withValue(ConfigurationKeys.JOB_GROUP_KEY, 
flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY)));
-    }
-
-    // Add flow execution id for this compilation
-    long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
-    
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
-        ConfigValueFactory.fromAnyRef(flowExecutionId)));
-
-    // Reset properties in Spec from Config
-    
jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
-    return jobSpec;
-  }
-
-  /**
-   * A naive implementation of resolving templates in each JobSpec among 
Multi-hop FlowSpec.
-   * Handle the case when edge is not specified.
-   * Always select the first available template.
-   */
-  private JobSpec convertHopToJobSpec (ServiceNode sourceNode, ServiceNode 
targetNode, FlowSpec flowSpec) {
-    FlowEdge flowEdge = weightedGraph.getAllEdges(sourceNode, 
targetNode).iterator().next();
-    URI templateURI = getTemplateURI (sourceNode, targetNode, flowSpec, 
flowEdge);
-    return buildJobSpec(sourceNode, targetNode, templateURI, flowSpec);
-  }
-
-  private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode, 
FlowSpec flowSpec, FlowEdge flowEdge) {
-    URI firstTemplateURI =
-        (edgeTemplateMap != null && 
edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get(
-            flowEdge.getEdgeIdentity()).get(0) : 
jobSpecTemplateURIGenerator(flowSpec);
-    return firstTemplateURI;
-  }
-
-  /**
-   * A naive implementation of generating a jobSpec's URI within a multi-hop 
logical Flow.
-   */
-  @Override
-  public URI jobSpecURIGenerator(Object... objects) {
-    FlowSpec flowSpec = (FlowSpec) objects[0];
-    ServiceNode sourceNode = (ServiceNode) objects[1];
-    ServiceNode targetNode = (ServiceNode) objects[2];
-    try {
-      return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, 
flowSpec.getUri().getAuthority(),
-          
StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(),
 "/"),"/")
-              + sourceNode.getNodeName() + "-" + targetNode.getNodeName(), 
null);
-    } catch (URISyntaxException e) {
-      log.error(
-          "URI construction failed when jobSpec from " + 
sourceNode.getNodeName() + " to " + targetNode.getNodeName());
-      throw new RuntimeException();
-    }
-  }
-}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
deleted file mode 100644
index 540b13d..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/ServicePolicy.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.policy;
-
-import java.util.Set;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-
-
-/**
- * ServicePolicy will be firstly checked before the compilation happen.
- * unexpcted edges will not be considered in compilation process.
- */
-public interface ServicePolicy {
-
-  /**
-   * After initialization of {@link ServicePolicy}, the populating method need 
to invoked before
-   * {@link #getBlacklistedEdges()} can return the expected result.
-   *
-   * This requirement exists because when {@link ServicePolicy} is initialized 
it is not necessary that a
-   * {@link org.jgrapht.graph.WeightedMultigraph} has been constructed, 
neither we cannot know if user-specified edges
-   * and nodes exist in {@link org.jgrapht.graph.WeightedMultigraph}.
-   * The population of blacklisted Edges make sense after graph has been 
constructed.
-   */
-  public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode, 
FlowEdge> graph);
-
-  /**
-   * Should return all edges that being blacklisted by this policy.
-   */
-  public Set<FlowEdge> getBlacklistedEdges();
-
-  public void addServiceNode(ServiceNode serviceNode);
-
-  public void addFlowEdge(FlowEdge flowEdge);
-}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
deleted file mode 100644
index aabc67e..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/policy/StaticServicePolicy.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.policy;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.gobblin.annotation.Alias;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * Defines {@link ServiceNode}s or {@link FlowEdge}s that should be 
blacklisted from the Flow-compiler process,
- * obtained only from configuration, which is the reason it is named as static.
- *
- * TODO: DynamicServicePolicy can obtain new blacklist candidate through Flow 
monitoring process, which is responsible
- * for monitoring the flow execution and react accordingly.
- *
- * Either user specify {@link ServiceNode} or {@link FlowEdge} to blacklist 
will all end up with a list of
- * {@link FlowEdge}s that won't be considered when selecting path for data 
transformation.
- */
-@Slf4j
-@Alias("static")
-public class StaticServicePolicy implements ServicePolicy {
-
-  @Getter
-  Set<FlowEdge> blacklistedEdges;
-
-  List<ServiceNode> serviceNodes;
-  List<FlowEdge> flowEdges;
-
-  public StaticServicePolicy() {
-    serviceNodes = new ArrayList<>();
-    flowEdges = new ArrayList<>();
-    blacklistedEdges = new HashSet<>();
-  }
-
-  public StaticServicePolicy(List<ServiceNode> serviceNodes, List<FlowEdge> 
flowEdges) {
-    Preconditions.checkNotNull(serviceNodes);
-    Preconditions.checkNotNull(flowEdges);
-    blacklistedEdges = new HashSet<>();
-    this.serviceNodes = serviceNodes;
-    this.flowEdges = flowEdges;
-  }
-
-  public void addServiceNode(ServiceNode serviceNode) {
-    this.serviceNodes.add(serviceNode);
-  }
-
-  public void addFlowEdge(FlowEdge flowEdge){
-    this.flowEdges.add(flowEdge);
-  }
-
-  @Override
-  public void populateBlackListedEdges(DirectedWeightedMultigraph<ServiceNode, 
FlowEdge> graph) {
-    for (ServiceNode node: serviceNodes) {
-      if (graph.containsVertex(node)) {
-        blacklistedEdges.addAll(graph.incomingEdgesOf(node));
-        blacklistedEdges.addAll(graph.outgoingEdgesOf(node));
-      } else {
-        log.info("The graph " + graph + " doesn't contains node " + 
node.toString());
-      }
-    }
-
-    for( FlowEdge flowEdge: flowEdges) {
-      if (graph.containsEdge(flowEdge)) {
-        blacklistedEdges.add(flowEdge);
-      } else {
-        log.info("The graph " + graph + "doesn't contains edge " + 
flowEdge.toString());
-      }
-    }
-  }
-}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
deleted file mode 100644
index 946fe10..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/DistancedNode.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.utils;
-import org.apache.gobblin.runtime.api.ServiceNode;
-
-/**
- * This is a helping class(Basically a wrapper) for Shortest path finding 
process
- * to keep the shortest distance from source to an arbitrary node.
- */
-public class DistancedNode<T extends ServiceNode> {
-
-  /**
-   * The distance between {@link this} node to the src node in the 
shortest-distance finding problem.
-   */
-  private double distToSrc;
-
-  private T _serviceNode;
-
-
-  /**
-   * Max_Value represents no-connection.
-   */
-  public DistancedNode(T _serviceNode){
-    this(_serviceNode, Double.MAX_VALUE);
-  }
-
-  public DistancedNode(T _serviceNode, double dist){
-    this._serviceNode = _serviceNode;
-    this.distToSrc = dist;
-  }
-
-  public double getDistToSrc(){
-    return this.distToSrc;
-  }
-
-  public void setDistToSrc(double distToSrc){
-    this.distToSrc = distToSrc;
-  }
-
-  public T getNode(){
-    return this._serviceNode;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    DistancedNode<?> that = (DistancedNode<?>) o;
-
-    return _serviceNode.equals(that._serviceNode);
-  }
-
-  @Override
-  public int hashCode() {
-    return _serviceNode.hashCode();
-  }
-}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
deleted file mode 100644
index 1481f78..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FindPathUtils.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.utils;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-
-import lombok.extern.slf4j.Slf4j;
-
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
-
-@Slf4j
-public class FindPathUtils {
-  // Since Author{[email protected]} couldn't find the proper way to conduct 
Library provided by JGraphT
-  // on the customized-edge Graph, here is the raw implementation of Dijkstra 
algorithm for finding shortest path.
-
-  /**
-   * Given sourceNode and targetNode, find the shortest path and return 
shortest path.
-   * @return Each edge on this shortest path, in order.
-   *
-   */
-  @VisibleForTesting
-  public static List<FlowEdge> dijkstraBasedPathFindingHelper(ServiceNode 
sourceNode, ServiceNode targetNode,
-      DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph) {
-    Map<DistancedNode, ArrayList<FlowEdge>> shortestPath = new HashMap<>();
-    Map<DistancedNode, Double> shortestDist = new HashMap<>();
-    PriorityQueue<DistancedNode> pq = new PriorityQueue<>(new 
Comparator<DistancedNode>() {
-      @Override
-      public int compare(DistancedNode o1, DistancedNode o2) {
-        if (o1.getDistToSrc() < o2.getDistToSrc()) {
-          return -1;
-        } else {
-          return 1;
-        }
-      }
-    });
-    pq.add(new DistancedNode(sourceNode, 0.0));
-
-    Set<FlowEdge> visitedEdge = new HashSet<>();
-
-    while(!pq.isEmpty()) {
-      DistancedNode node = pq.poll();
-      if (node.getNode().getNodeName().equals(targetNode.getNodeName())) {
-        // Searching finished
-        return shortestPath.get(node);
-      }
-
-      Set<FlowEdge> outgoingEdges = 
weightedGraph.outgoingEdgesOf(node.getNode());
-      for (FlowEdge outGoingEdge:outgoingEdges) {
-        // Since it is a multi-graph problem, should use edge for 
deduplicaiton instead of vertex.
-        if (visitedEdge.contains(outGoingEdge)) {
-          continue;
-        }
-
-        DistancedNode adjacentNode = new 
DistancedNode(weightedGraph.getEdgeTarget(outGoingEdge));
-        if (shortestDist.containsKey(adjacentNode)) {
-          adjacentNode.setDistToSrc(shortestDist.get(adjacentNode));
-        }
-
-        double newDist = node.getDistToSrc() + ((LoadBasedFlowEdgeImpl) 
outGoingEdge).getEdgeLoad();
-
-        if (newDist < adjacentNode.getDistToSrc()) {
-          if (pq.contains(adjacentNode)) {
-            pq.remove(adjacentNode);
-          }
-
-          // Update the shortest path.
-          ArrayList<FlowEdge> path = shortestPath.containsKey(node)
-              ? new ArrayList<>(shortestPath.get(node)) : new ArrayList<>();
-          path.add(outGoingEdge);
-          shortestPath.put(adjacentNode, path);
-          shortestDist.put(adjacentNode, newDist);
-
-          adjacentNode.setDistToSrc(newDist);
-          pq.add(adjacentNode);
-        }
-        visitedEdge.add(outGoingEdge);
-      }
-    }
-    log.error("No path found");
-    return new ArrayList<>();
-  }
-}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
deleted file mode 100644
index 8b08628..0000000
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.service.modules.core;
-
-import java.io.File;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import com.typesafe.config.Config;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.fs.Path;
-import org.jgrapht.graph.DirectedWeightedMultigraph;
-import org.jgrapht.graph.WeightedMultigraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.spec_executorInstance.BaseServiceNodeImpl;
-import org.apache.gobblin.runtime.api.FlowEdge;
-import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.ServiceNode;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.flow.LoadBasedFlowEdgeImpl;
-import org.apache.gobblin.service.modules.flow.MultiHopsFlowToJobSpecCompiler;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.PathUtils;
-
-import static org.apache.gobblin.service.modules.utils.FindPathUtils.*;
-
-
-// All unit tests here will be with templateCatelogue.
-public class MultiHopsFlowToJobSpecCompilerTest {
-  private static final Logger logger = 
LoggerFactory.getLogger(MultiHopsFlowToJobSpecCompilerTest.class);
-
-  private static final String TEST_TEMPLATE_CATALOG_PATH = 
"/tmp/gobblinTestTemplateCatalog_" + System.currentTimeMillis();
-  private static final String TEST_TEMPLATE_CATALOG_URI = "file://" + 
TEST_TEMPLATE_CATALOG_PATH;
-
-  private static final String TEST_TEMPLATE_NAME = "test.template";
-  private static final String TEST_TEMPLATE_URI = "FS:///test.template";
-
-  // The path to be discovered is TEST_SOURCE_NAME -> TEST_HOP_NAME_A -> 
TEST_HOP_NAME_B -> TEST_SINK_NAME
-  private static final String TEST_SOURCE_NAME = "testSource";
-  private static final String TEST_HOP_NAME_A = "testHopA";
-  private static final String TEST_HOP_NAME_B = "testHopB";
-  private static final String TEST_HOP_NAME_C = "testHopC";
-  private static final String TEST_SINK_NAME = "testSink";
-  private static final String TEST_FLOW_GROUP = "testFlowGroup";
-  private static final String TEST_FLOW_NAME = "testFlowName";
-
-  private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/";
-  private static final String SPEC_DESCRIPTION = "Test Orchestrator";
-  private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
-  private static final String TOPOLOGY_SPEC_STORE_DIR = 
"/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis();
-  private static final String TOPOLOGY_SPEC_STORE_DIR_SECOND = 
"/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis() + "_2";
-  private static final String FLOW_SPEC_STORE_DIR = 
"/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis();
-
-  private ServiceNode vertexSource;
-  private ServiceNode vertexHopA;
-  private ServiceNode vertexHopB;
-  private ServiceNode vertexHopC;
-  private ServiceNode vertexSink;
-
-  private MultiHopsFlowToJobSpecCompiler compilerWithTemplateCalague;
-  private Map<String, List<URI>> edgeTemplateMap;
-
-
-  @BeforeClass
-  public void setUp() throws Exception{
-    // Create dir for template catalog
-    FileUtils.forceMkdir(new File(TEST_TEMPLATE_CATALOG_PATH));
-
-    // Create template to use in test
-    List<String> templateEntries = new ArrayList<>();
-    templateEntries.add("testProperty1 = \"testValue1\"");
-    templateEntries.add("testProperty2 = \"test.Value1\"");
-    templateEntries.add("testProperty3 = 100");
-    FileUtils.writeLines(new File(TEST_TEMPLATE_CATALOG_PATH + "/" + 
TEST_TEMPLATE_NAME), templateEntries);
-
-    // Initialize complier with template catalog
-    Properties compilerWithTemplateCatalogProperties = new Properties();
-    
compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
 TEST_TEMPLATE_CATALOG_URI);
-
-    // Initialize compiler with common useful properties
-    String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," + 
TEST_HOP_NAME_B + "," + TEST_SINK_NAME;
-    
compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH,
 testPath);
-
-    this.compilerWithTemplateCalague = new 
MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties));
-
-    vertexSource = new BaseServiceNodeImpl(TEST_SOURCE_NAME);
-    vertexHopA = new BaseServiceNodeImpl(TEST_HOP_NAME_A);
-    vertexHopB = new BaseServiceNodeImpl(TEST_HOP_NAME_B);
-    vertexHopC = new BaseServiceNodeImpl(TEST_HOP_NAME_C);
-    vertexSink = new BaseServiceNodeImpl(TEST_SINK_NAME);
-
-  }
-
-  @AfterClass
-  public void cleanUp() throws Exception {
-    // Cleanup Template Catalog
-    try {
-      cleanUpDir(TEST_TEMPLATE_CATALOG_PATH);
-    } catch (Exception e) {
-      logger.warn("Could not completely cleanup Template catalog dir");
-    }
-
-    // Cleanup ToplogySpec Dir
-    try {
-      cleanUpDir(TOPOLOGY_SPEC_STORE_DIR);
-    } catch (Exception e) {
-      logger.warn("Could not completely cleanup ToplogySpec catalog dir");
-    }
-
-    // Cleanup FlowSpec Dir
-    try {
-      cleanUpDir(FLOW_SPEC_STORE_DIR);
-    } catch (Exception e) {
-      logger.warn("Could not completely cleanup FlowSpec catalog dir");
-    }
-  }
-
-  @Test
-  public void testWeightedGraphConstruction(){
-    FlowSpec flowSpec = initFlowSpec();
-    TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, 
TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
-    this.compilerWithTemplateCalague.onAddSpec(topologySpec);
-
-    // invocation of compileFlow trigger the weighedGraph construction
-    this.compilerWithTemplateCalague.compileFlow(flowSpec);
-    DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph = 
compilerWithTemplateCalague.getWeightedGraph();
-
-    Assert.assertTrue(weightedGraph.containsVertex(vertexSource));
-    Assert.assertTrue(weightedGraph.containsVertex(vertexHopA));
-    Assert.assertTrue(weightedGraph.containsVertex(vertexHopB));
-    Assert.assertTrue(weightedGraph.containsVertex(vertexSink));
-
-    FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA, 
topologySpec.getSpecExecutor());
-    FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB, 
topologySpec.getSpecExecutor());
-    FlowEdge edgeB2Sink = new LoadBasedFlowEdgeImpl(vertexHopB, vertexSink, 
topologySpec.getSpecExecutor());
-
-    Assert.assertTrue(weightedGraph.containsEdge(edgeSrc2A));
-    Assert.assertTrue(weightedGraph.containsEdge(edgeA2B));
-    Assert.assertTrue(weightedGraph.containsEdge(edgeB2Sink));
-
-    Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexSource, 
vertexHopA), edgeSrc2A));
-    Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopA, vertexHopB), 
edgeA2B));
-    Assert.assertTrue(edgeEqual(weightedGraph.getEdge(vertexHopB, vertexSink), 
edgeB2Sink));
-
-    this.compilerWithTemplateCalague.onDeleteSpec(topologySpec.getUri(), "");
-  }
-
-  @Test
-  public void testUserSpecifiedPathCompilation(){
-    // TODO
-  }
-
-  @Test
-  public void testServicePolicy(){
-    // Initialize compiler with some blacklist properties
-    Properties properties = new Properties();
-    
properties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
 TEST_TEMPLATE_CATALOG_URI);
-    String testPath = TEST_SOURCE_NAME + "," + TEST_HOP_NAME_A + "," + 
TEST_HOP_NAME_B + "," + TEST_SINK_NAME;
-    properties.setProperty(ServiceConfigKeys.POLICY_BASED_DATA_MOVEMENT_PATH, 
testPath);
-    properties.setProperty(ServiceConfigKeys.POLICY_BASED_BLOCKED_NODES,
-        "testHopA");
-    MultiHopsFlowToJobSpecCompiler compiler = new 
MultiHopsFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(properties));
-
-
-    FlowSpec flowSpec = initFlowSpec();
-    TopologySpec topologySpec = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, 
TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
-    compiler.onAddSpec(topologySpec);
-
-    // invocation of compileFlow trigger the weighedGraph construction
-    compiler.compileFlow(flowSpec);
-
-    
compiler.servicePolicy.populateBlackListedEdges(compiler.getWeightedGraph());
-    Assert.assertEquals(compiler.servicePolicy.getBlacklistedEdges().size(), 
2);
-
-    FlowEdge edgeSrc2A = new LoadBasedFlowEdgeImpl(vertexSource, vertexHopA, 
topologySpec.getSpecExecutor());
-    FlowEdge edgeA2B = new LoadBasedFlowEdgeImpl(vertexHopA, vertexHopB, 
topologySpec.getSpecExecutor());
-
-    
Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeSrc2A));
-    
Assert.assertTrue(compiler.servicePolicy.getBlacklistedEdges().contains(edgeA2B));
-
-  }
-
-  @Test (dependsOnMethods = "testWeightedGraphConstruction")
-  public void testDijkstraPathFinding(){
-
-    FlowSpec flowSpec = initFlowSpec();
-    TopologySpec topologySpec_1 = initTopologySpec(TOPOLOGY_SPEC_STORE_DIR, 
TEST_SOURCE_NAME, TEST_HOP_NAME_A, TEST_HOP_NAME_B, TEST_SINK_NAME);
-    TopologySpec topologySpec_2 = 
initTopologySpec(TOPOLOGY_SPEC_STORE_DIR_SECOND, TEST_SOURCE_NAME, 
TEST_HOP_NAME_B, TEST_HOP_NAME_C, TEST_SINK_NAME);
-    this.compilerWithTemplateCalague.onAddSpec(topologySpec_1);
-    this.compilerWithTemplateCalague.onAddSpec(topologySpec_2);
-
-    // Get the edge -> Change the weight -> Materialized the edge change back 
to graph -> compile again -> Assertion
-    this.compilerWithTemplateCalague.compileFlow(flowSpec);
-    DirectedWeightedMultigraph<ServiceNode, FlowEdge> weightedGraph = 
compilerWithTemplateCalague.getWeightedGraph();
-    FlowEdge a2b= weightedGraph.getEdge(vertexHopA, vertexHopB);
-    FlowEdge b2c = weightedGraph.getEdge(vertexHopB, vertexHopC);
-    FlowEdge c2s = weightedGraph.getEdge(vertexHopC, vertexSink);
-    weightedGraph.setEdgeWeight(a2b, 1.99);
-    weightedGraph.setEdgeWeight(b2c, 0.1);
-    weightedGraph.setEdgeWeight(c2s, 0.2);
-
-    // Best route: Src - B(1) - C(0.1) - sink (0.2)
-    this.compilerWithTemplateCalague.compileFlow(flowSpec);
-    List<FlowEdge> edgeList = dijkstraBasedPathFindingHelper(vertexSource, 
vertexSink, weightedGraph);
-
-    FlowEdge src2b = weightedGraph.getEdge(vertexSource, vertexHopB);
-    FlowEdge b2C = weightedGraph.getEdge(vertexHopB, vertexHopC);
-    FlowEdge c2sink = weightedGraph.getEdge(vertexHopC, vertexSink);
-    Assert.assertEquals(edgeList.get(0).getEdgeIdentity(), 
src2b.getEdgeIdentity());
-    Assert.assertEquals(edgeList.get(1).getEdgeIdentity(), 
b2C.getEdgeIdentity());
-    Assert.assertEquals(edgeList.get(2).getEdgeIdentity(), 
c2sink.getEdgeIdentity());
-
-    this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_1.getUri(), "");
-    this.compilerWithTemplateCalague.onDeleteSpec(topologySpec_2.getUri(), "");
-  }
-
-  // The topology is: Src - A - B - Dest
-  private TopologySpec initTopologySpec(String storeDir, String ... args) {
-    Properties properties = new Properties();
-    properties.put("specStore.fs.dir", storeDir);
-    String capabilitiesString = "";
-    for(int i =0 ; i < args.length - 1 ; i ++ ) {
-      capabilitiesString = capabilitiesString + ( args[i] + ":" + args[i+1] + 
",");
-    }
-    Assert.assertEquals(capabilitiesString.charAt(capabilitiesString.length() 
- 1) , ',');
-    capabilitiesString = capabilitiesString.substring(0, 
capabilitiesString.length() - 1 );
-    properties.put("specExecInstance.capabilities", capabilitiesString);
-    properties.put("executorAttrs", new Properties());
-    Config config = ConfigUtils.propertiesToConfig(properties);
-    SpecExecutor specExecutorInstance = new InMemorySpecExecutor(config);
-
-    TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(
-        
IdentityFlowToJobSpecCompilerTest.computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
-            storeDir))
-        .withConfig(config)
-        .withDescription(SPEC_DESCRIPTION)
-        .withVersion(SPEC_VERSION)
-        .withSpecExecutor(specExecutorInstance);
-    return topologySpecBuilder.build();
-  }
-
-  private FlowSpec initFlowSpec() {
-    return initFlowSpec(TEST_FLOW_GROUP, TEST_FLOW_NAME, TEST_SOURCE_NAME, 
TEST_SINK_NAME);
-  }
-
-  private FlowSpec initFlowSpec(String flowGroup, String flowName, String 
source, String destination) {
-    Properties properties = new Properties();
-    properties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
-    properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
-    properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
-    properties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source);
-    properties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
destination);
-    Config config = ConfigUtils.propertiesToConfig(properties);
-
-    FlowSpec.Builder flowSpecBuilder = null;
-    try {
-      flowSpecBuilder = 
FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR,
-          FLOW_SPEC_STORE_DIR))
-          .withConfig(config)
-          .withDescription("dummy description")
-          .withVersion(SPEC_VERSION)
-          .withTemplate(new URI(TEST_TEMPLATE_URI));
-    } catch (URISyntaxException e) {
-      throw new RuntimeException(e);
-    }
-    return flowSpecBuilder.build();
-  }
-
-  private void cleanUpDir(String dir) throws Exception {
-    File specStoreDir = new File(dir);
-    if (specStoreDir.exists()) {
-      FileUtils.deleteDirectory(specStoreDir);
-    }
-  }
-
-  // Criteria for FlowEdge to be equal in testing context
-  private boolean edgeEqual(FlowEdge a, FlowEdge b){
-    return (a.getEdgeIdentity().equals(b.getEdgeIdentity()) &&
-        ((LoadBasedFlowEdgeImpl)a).getEdgeLoad() == 
((LoadBasedFlowEdgeImpl)b).getEdgeLoad());
-  }
-
-  // Use this function for
-  private void populateTemplateMap(WeightedMultigraph<ServiceNode, FlowEdge> 
weightedGraph, URI exempliedURI){
-    this.edgeTemplateMap.clear();
-    Set<FlowEdge> allEdges = weightedGraph.edgeSet();
-    for ( FlowEdge edge : allEdges ) {
-      this.edgeTemplateMap.put(edge.getEdgeIdentity(), 
Arrays.asList(exempliedURI)) ;
-    }
-  }
-
-  public static URI computeTopologySpecURI(String parent, String current) {
-    // Make sure this is relative
-    return PathUtils.relativizePath(new Path(current), new 
Path(parent)).toUri();
-  }
-
-}
\ No newline at end of file

Reply via email to