http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
new file mode 100644
index 0000000..c6f5c74
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+
+
+/**
+ * An implementation of an ADL (Azure Data Lake) {@link 
org.apache.gobblin.service.modules.flowgraph.DataNode}.
+ */
+public class AdlsDataNode extends FileSystemDataNode {
+  public static final String ADLS_SCHEME = "adl";
+
+  public AdlsDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+    * @param fsUri FileSystem URI
+    * @return true if the scheme is "adl" and authority is not empty.
+    */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    //Check that the scheme is "adl"
+    if (!scheme.equals(ADLS_SCHEME)) {
+      return false;
+    }
+    //Ensure that the authority is not empty
+    if (Strings.isNullOrEmpty(fsUri.getAuthority())) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
new file mode 100644
index 0000000..72f1a66
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+import joptsimple.internal.Strings;
+import lombok.Getter;
+
+
+/**
+ * An abstract {@link FileSystemDataNode} implementation. In addition to the 
required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
+ * must have a FS URI specified. Example implementations of {@link 
FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
+ */
+@Alpha
+public abstract class FileSystemDataNode extends BaseDataNode {
+  public static final String FS_URI_KEY = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri";
+
+  @Getter
+  private String fsUri;
+
+  /**
+   * Constructor. An HDFS DataNode must have fs.uri property specified in 
addition to a node Id.
+   */
+  public FileSystemDataNode(Config nodeProps) throws DataNodeCreationException 
{
+    super(nodeProps);
+    try {
+      this.fsUri = ConfigUtils.getString(nodeProps, FS_URI_KEY, "");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(this.fsUri), "fs.uri 
cannot be null or empty.");
+
+      //Validate the srcFsUri and destFsUri of the DataNode.
+      if (!isUriValid(new URI(this.fsUri))) {
+        throw new IOException("Invalid FS URI " + this.fsUri);
+      }
+    } catch (Exception e) {
+      throw new DataNodeCreationException(e);
+    }
+  }
+
+  public abstract boolean isUriValid(URI fsUri);
+  /**
+   * Two HDFS DataNodes are the same if they have the same id and the same 
fsUri.
+   */
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FileSystemDataNode that = (FileSystemDataNode) o;
+
+    return this.getId().equals(that.getId()) && 
this.fsUri.equals(that.getFsUri());
+  }
+
+  @Override
+  public int hashCode() {
+    return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
new file mode 100644
index 0000000..5402074
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * An implementation of {@link HdfsDataNode}. All the properties specific to a 
HDFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class HdfsDataNode extends FileSystemDataNode {
+  public static final String HDFS_SCHEME = "hdfs";
+
+  public HdfsDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+   *
+   * @param fsUri FileSystem URI
+   * @return true if the scheme is "hdfs" and authority is not empty.
+   */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    //Check that the scheme is "hdfs"
+    if (!scheme.equals(HDFS_SCHEME)) {
+      return false;
+    }
+    //Ensure that the authority is not empty
+    if (Strings.isNullOrEmpty(fsUri.getAuthority())) {
+      return false;
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
new file mode 100644
index 0000000..757d4a0
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import org.apache.gobblin.annotation.Alpha;
+
+import com.typesafe.config.Config;
+
+/**
+ * An implementation of {@link LocalFSDataNode}. All the properties specific 
to a LocalFS based data node (e.g. fs.uri)
+ * are validated here.
+ */
+@Alpha
+public class LocalFSDataNode extends FileSystemDataNode {
+  public static final String LOCAL_FS_SCHEME = "file";
+
+  public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+   *
+   * @param fsUri FileSystem URI
+   * @return true if the scheme of fsUri equals "file"
+   */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    if (scheme.equals(LOCAL_FS_SCHEME)) {
+      return true;
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
new file mode 100644
index 0000000..c0c9297
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.base.Joiner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A data class that encapsulates information for executing a job. This 
includes a {@link JobSpec} and a {@link SpecExecutor}
+ * where the {@link JobSpec} will be executed.
+ */
+@Data
+@AllArgsConstructor
+public class JobExecutionPlan {
+  private JobSpec jobSpec;
+  private SpecExecutor specExecutor;
+
+  public static class Factory {
+
+    public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, 
SpecExecutor specExecutor, Long flowExecutionId)
+        throws URISyntaxException {
+        JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId);
+        return new JobExecutionPlan(jobSpec, specExecutor);
+    }
+
+    /**
+     * Given a resolved job config, this helper method converts the config to 
a {@link JobSpec}.
+     * @param jobConfig resolved job config.
+     * @param flowSpec input FlowSpec.
+     * @return a {@link JobSpec} corresponding to the resolved job config.
+     */
+    private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, 
Long flowExecutionId) throws URISyntaxException {
+      Config flowConfig = flowSpec.getConfig();
+
+      String flowName = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
+      String flowGroup = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
+      String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
+
+      //Modify the job name to include the flow group:flow name.
+      jobName = Joiner.on(":").join(flowGroup, flowName, jobName);
+
+      JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, 
flowSpec)).withConfig(jobConfig)
+          
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());
+
+      //Get job template uri
+      URI jobTemplateUri = new 
URI(jobConfig.getString(ConfigurationKeys.JOB_TEMPLATE_PATH));
+      JobSpec jobSpec = jobSpecBuilder.withTemplate(jobTemplateUri).build();
+
+      //Add flowName to job spec
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_NAME_KEY,
 ConfigValueFactory.fromAnyRef(flowName)));
+
+      //Add job name
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef(jobName)));
+
+      //Add flow execution id
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
 ConfigValueFactory.fromAnyRef(flowExecutionId)));
+
+      // Remove schedule
+      
jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY));
+
+      // Add job.name and job.group
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_NAME_KEY, 
ConfigValueFactory.fromAnyRef(jobName)));
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_GROUP_KEY,
 ConfigValueFactory.fromAnyRef(flowGroup)));
+
+      //Enable job lock for each job to prevent concurrent executions.
+      
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_LOCK_ENABLED_KEY,
 ConfigValueFactory.fromAnyRef(true)));
+
+      // Reset properties in Spec from Config
+      
jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
+
+      return jobSpec;
+    }
+
+
+    /**
+     * A naive implementation of generating a jobSpec's URI within a multi-hop 
flow that follows the convention:
+     * <JOB_CATALOG_SCHEME>/{@link ConfigurationKeys#JOB_GROUP_KEY}/{@link 
ConfigurationKeys#JOB_NAME_KEY}.
+     */
+    public static URI jobSpecURIGenerator(String jobGroup, String jobName, 
FlowSpec flowSpec)
+        throws URISyntaxException {
+      return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, 
flowSpec.getUri().getAuthority(),
+          
StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(),
 "/"), "/") + jobGroup
+              + "/" + jobName, null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
new file mode 100644
index 0000000..f942f8d
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlanDagFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+
+
+/**
+ * A Factory class used for constructing a {@link Dag} of {@link 
JobExecutionPlan}s from
+ * a {@link List} of {@link JobExecutionPlan}s.
+ */
+@Alpha
+@Slf4j
+public class JobExecutionPlanDagFactory {
+
+  public Dag<JobExecutionPlan> createDag(List<JobExecutionPlan> 
jobExecutionPlans) {
+    //Maintain a mapping between job name and the corresponding 
JobExecutionPlan.
+    Map<String, Dag.DagNode<JobExecutionPlan>> JobExecutionPlanMap = new 
HashMap<>();
+    List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
+    /**
+     * Create a {@link Dag.DagNode<JobExecutionPlan>} for every {@link 
JobSpec} in the flow. Add this node
+     * to a HashMap.
+     */
+    for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      Dag.DagNode<JobExecutionPlan> dagNode = new 
Dag.DagNode<>(jobExecutionPlan);
+      dagNodeList.add(dagNode);
+      String jobName = getJobName(jobExecutionPlan);
+      if (jobName != null) {
+        JobExecutionPlanMap.put(jobName, dagNode);
+      }
+    }
+
+    /**
+     * Iterate over each {@link JobSpec} to get the dependencies of each 
{@link JobSpec}.
+     * For each {@link JobSpec}, get the corresponding {@link Dag.DagNode} and
+     * set the {@link Dag.DagNode}s corresponding to its dependencies as its 
parent nodes.
+     *
+     * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
+     */
+    for (JobExecutionPlan jobExecutionPlan : jobExecutionPlans) {
+      String jobName = getJobName(jobExecutionPlan);
+      if (jobName == null) {
+        continue;
+      }
+      Dag.DagNode<JobExecutionPlan> node = JobExecutionPlanMap.get(jobName);
+      Collection<String> dependencies = 
getDependencies(jobExecutionPlan.getJobSpec().getConfig());
+      for (String dependency : dependencies) {
+        Dag.DagNode<JobExecutionPlan> parentNode = 
JobExecutionPlanMap.get(dependency);
+        node.addParentNode(parentNode);
+      }
+    }
+    Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
+    return dag;
+  }
+
+  /**
+   * Get job dependencies of a given job from its config.
+   * @param config of a job.
+   * @return a list of dependencies of the job.
+   */
+  private static List<String> getDependencies(Config config) {
+    return config.hasPath(ConfigurationKeys.JOB_DEPENDENCIES) ? Arrays
+        
.asList(config.getString(ConfigurationKeys.JOB_DEPENDENCIES).split(",")) : new 
ArrayList<>();
+  }
+
+  /**
+   * The job name is derived from the {@link 
org.apache.gobblin.runtime.api.JobTemplate} URI. It is the
+   * simple name of the path component of the URI.
+   * @param jobExecutionPlan
+   * @return the simple name from the URI path.
+   */
+  private static String getJobName(JobExecutionPlan jobExecutionPlan) {
+    Optional<URI> jobTemplateUri = 
jobExecutionPlan.getJobSpec().getTemplateURI();
+    if (jobTemplateUri.isPresent()) {
+      return Files.getNameWithoutExtension(new 
Path(jobTemplateUri.get()).getName());
+    } else {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
index 30d8309..a8350fd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -20,16 +20,16 @@ package org.apache.gobblin.service.modules.template;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
 
 import com.typesafe.config.Config;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
 
 /**
  * An interface primarily for representing a flow of {@link JobTemplate}s. It 
also has
@@ -45,18 +45,37 @@ public interface FlowTemplate extends Spec {
 
   /**
    *
-   * @return the {@link Dag<JobTemplate>} that backs the {@link FlowTemplate}.
+   * @return all configuration inside pre-written template.
    */
-  Dag<JobTemplate> getDag() throws IOException;
+  Config getRawTemplateConfig();
 
   /**
-   *
-   * @return all configuration inside pre-written template.
+   * @param userConfig a list of user customized attributes.
+   * @return list of input/output {@link DatasetDescriptor}s that fully 
resolve the {@link FlowTemplate} using the
+   * provided userConfig.
    */
-  Config getRawTemplateConfig();
+  List<Pair<DatasetDescriptor, DatasetDescriptor>> 
getResolvingDatasetDescriptors(Config userConfig)
+      throws IOException, ReflectiveOperationException, SpecNotFoundException, 
JobTemplate.TemplateException;
+
+  /**
+   * Checks if the {@link FlowTemplate} is resolvable using the provided 
{@link Config} object. A {@link FlowTemplate}
+   * is resolvable only if each of the {@link JobTemplate}s in the flow is 
resolvable
+   * @param userConfig User supplied Config
+   * @param inputDescriptor input {@link DatasetDescriptor}
+   * @param outputDescriptor output {@link DatasetDescriptor}
+   * @return true if the {@link FlowTemplate} is resolvable
+   */
+  boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, 
DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException;
 
   /**
-   * @return list of input/output {@link DatasetDescriptor}s for the {@link 
FlowTemplate}.
+   * Resolves the {@link JobTemplate}s underlying this {@link FlowTemplate} 
and returns a {@link List} of resolved
+   * job {@link Config}s.
+   * @param userConfig User supplied Config
+   * @param inputDescriptor input {@link DatasetDescriptor}
+   * @param outputDescriptor output {@link DatasetDescriptor}
+   * @return a list of resolved job {@link Config}s.
    */
-  List<Pair<DatasetDescriptor, DatasetDescriptor>> 
getInputOutputDatasetDescriptors();
+  List<Config> getResolvedJobConfigs(Config userConfig, DatasetDescriptor 
inputDescriptor, DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
index 553f067..ba9c091 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/HOCONInputStreamFlowTemplate.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
@@ -43,15 +44,15 @@ public class HOCONInputStreamFlowTemplate extends 
StaticFlowTemplate {
   public static final String VERSION_KEY = "gobblin.flow.template.version";
   private static final String DEFAULT_VERSION = "1";
 
-  public HOCONInputStreamFlowTemplate(InputStream inputStream, URI uri, 
FlowCatalogWithTemplates catalog)
-      throws SpecNotFoundException, IOException, ReflectiveOperationException, 
JobTemplate.TemplateException {
+  public HOCONInputStreamFlowTemplate(InputStream inputStream, URI 
flowTemplateDirUri, FlowCatalogWithTemplates catalog)
+      throws SpecNotFoundException, IOException, 
JobTemplate.TemplateException, URISyntaxException {
     this(ConfigFactory.parseReader(new InputStreamReader(inputStream, 
Charsets.UTF_8)).resolve(
-        ConfigResolveOptions.defaults().setAllowUnresolved(true)), uri, 
catalog);
+        ConfigResolveOptions.defaults().setAllowUnresolved(true)), 
flowTemplateDirUri, catalog);
   }
 
-  public HOCONInputStreamFlowTemplate(Config config, URI uri, 
FlowCatalogWithTemplates catalog)
-      throws SpecNotFoundException, IOException, ReflectiveOperationException, 
JobTemplate.TemplateException {
-    super(uri, ConfigUtils.getString(config, VERSION_KEY, DEFAULT_VERSION),
+  public HOCONInputStreamFlowTemplate(Config config, URI flowTemplateDirUri, 
FlowCatalogWithTemplates catalog)
+      throws SpecNotFoundException, IOException, 
JobTemplate.TemplateException, URISyntaxException {
+    super(flowTemplateDirUri, config.hasPath(VERSION_KEY) ? 
config.getString(VERSION_KEY) : DEFAULT_VERSION,
         config.hasPath(ConfigurationKeys.FLOW_DESCRIPTION_KEY) ? config
             .getString(ConfigurationKeys.FLOW_DESCRIPTION_KEY) : "", config, 
catalog);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
deleted file mode 100644
index b89da4d..0000000
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/JobTemplateDagFactory.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.template;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * A Factory class used for constructing a {@link Dag} of {@link 
org.apache.gobblin.runtime.api.JobTemplate}s from
- * a {@link URI} of a {@link FlowTemplate}.
- */
-@Alpha
-@Slf4j
-public class JobTemplateDagFactory {
-  public static final String JOB_TEMPLATE_FILE_SUFFIX = ".conf";
-
-  public static Dag<JobTemplate> createDagFromJobTemplates(List<JobTemplate> 
jobTemplates) {
-    Map<URI, Dag.DagNode<JobTemplate>> uriJobTemplateMap = new HashMap<>();
-    List<Dag.DagNode<JobTemplate>> dagNodeList = new ArrayList<>();
-    /**
-     * Create a {@link Dag.DagNode<JobTemplate>} for every {@link JobTemplate} 
in the flow. Add this node
-     * to a {@link Map<URI,JobTemplate>}.
-     */
-    for (JobTemplate template : jobTemplates) {
-      Dag.DagNode<JobTemplate> dagNode = new Dag.DagNode<>(template);
-      dagNodeList.add(dagNode);
-      uriJobTemplateMap.put(template.getUri(), dagNode);
-    }
-
-    /**
-     * Iterate over each {@link JobTemplate} to get the dependencies of each 
{@link JobTemplate}.
-     * For each {@link JobTemplate}, get the corresponding {@link Dag.DagNode} 
and
-     * set the {@link Dag.DagNode}s corresponding to the dependencies as its 
parent nodes.
-     *
-     * TODO: we likely do not need 2 for loops and we can do this in 1 pass.
-     */
-    Path templateDirPath = new Path(jobTemplates.get(0).getUri()).getParent();
-    for (JobTemplate template : jobTemplates) {
-      URI templateUri = template.getUri();
-      Dag.DagNode<JobTemplate> node = uriJobTemplateMap.get(templateUri);
-      Collection<String> dependencies = template.getDependencies();
-      for (String dependency : dependencies) {
-        URI dependencyUri = new Path(templateDirPath, 
dependency).suffix(JOB_TEMPLATE_FILE_SUFFIX).toUri();
-        Dag.DagNode<JobTemplate> parentNode = 
uriJobTemplateMap.get(dependencyUri);
-        node.addParentNode(parentNode);
-      }
-    }
-    Dag<JobTemplate> dag = new Dag<>(dagNodeList);
-    return dag;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 46f99d3..5f8dfda 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -19,38 +19,40 @@ package org.apache.gobblin.service.modules.template;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigResolveOptions;
+import com.typesafe.config.ConfigValueFactory;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.dataset.DatasetDescriptor;
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import 
org.apache.gobblin.service.modules.template_catalog.FlowCatalogWithTemplates;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import org.apache.hadoop.fs.Path;
-
-import lombok.Getter;
 
 
 /**
  * A {@link FlowTemplate} using a static {@link Config} as the raw 
configuration for the template.
  */
 @Alpha
+@Slf4j
 public class StaticFlowTemplate implements FlowTemplate {
   private static final long serialVersionUID = 84641624233978L;
 
-  public static final String INPUT_DATASET_DESCRIPTOR_PREFIX = 
"gobblin.flow.dataset.descriptor.input";
-  public static final String OUTPUT_DATASET_DESCRIPTOR_PREFIX = 
"gobblin.flow.dataset.descriptor.output";
-  public static final String DATASET_DESCRIPTOR_CLASS_KEY = "class";
-
   @Getter
   private URI uri;
   @Getter
@@ -60,69 +62,74 @@ public class StaticFlowTemplate implements FlowTemplate {
   @Getter
   private transient FlowCatalogWithTemplates catalog;
   @Getter
-  private List<Pair<DatasetDescriptor, DatasetDescriptor>> 
inputOutputDatasetDescriptors;
-  @Getter
   private List<JobTemplate> jobTemplates;
 
-  private transient Dag<JobTemplate> dag;
-
   private transient Config rawConfig;
-  private boolean isTemplateMaterialized;
 
-  public StaticFlowTemplate(URI uri, String version, String description, 
Config config,
+  public StaticFlowTemplate(URI flowTemplateDirUri, String version, String 
description, Config config,
       FlowCatalogWithTemplates catalog)
-      throws IOException, ReflectiveOperationException, SpecNotFoundException, 
JobTemplate.TemplateException {
-    this.uri = uri;
+      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
+    this.uri = flowTemplateDirUri;
     this.version = version;
     this.description = description;
-    this.inputOutputDatasetDescriptors = buildInputOutputDescriptors(config);
     this.rawConfig = config;
     this.catalog = catalog;
-    URI flowTemplateDir = new Path(this.uri).getParent().toUri();
-    this.jobTemplates = this.catalog.getJobTemplatesForFlow(flowTemplateDir);
+    this.jobTemplates = 
this.catalog.getJobTemplatesForFlow(flowTemplateDirUri);
   }
 
   //Constructor for testing purposes
-  public StaticFlowTemplate(URI uri, String version, String description, 
Config config,
-      FlowCatalogWithTemplates catalog, List<Pair<DatasetDescriptor, 
DatasetDescriptor>> inputOutputDatasetDescriptors, List<JobTemplate> 
jobTemplates)
-      throws IOException, ReflectiveOperationException, SpecNotFoundException, 
JobTemplate.TemplateException {
+  public StaticFlowTemplate(URI uri, String version, String description, 
Config config, FlowCatalogWithTemplates catalog, List<JobTemplate> 
jobTemplates) {
     this.uri = uri;
     this.version = version;
     this.description = description;
-    this.inputOutputDatasetDescriptors = inputOutputDatasetDescriptors;
     this.rawConfig = config;
     this.catalog = catalog;
     this.jobTemplates = jobTemplates;
   }
 
+
   /**
    * Generate the input/output dataset descriptors for the {@link 
FlowTemplate}.
+   * @param userConfig
+   * @return a List of Input/Output DatasetDescriptors that resolve this 
{@link FlowTemplate}.
    */
-  private List<Pair<DatasetDescriptor, DatasetDescriptor>> 
buildInputOutputDescriptors(Config config)
-      throws IOException, ReflectiveOperationException {
-    if (!config.hasPath(INPUT_DATASET_DESCRIPTOR_PREFIX) || 
!config.hasPath(OUTPUT_DATASET_DESCRIPTOR_PREFIX)) {
+  @Override
+  public List<Pair<DatasetDescriptor, DatasetDescriptor>> 
getResolvingDatasetDescriptors(Config userConfig)
+      throws IOException, SpecNotFoundException, JobTemplate.TemplateException 
{
+    Config config = 
this.getResolvedFlowConfig(userConfig).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+    if 
(!config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX)
+        || 
!config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX))
 {
       throw new IOException("Flow template must specify at least one 
input/output dataset descriptor");
     }
+
     int i = 0;
-    String inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, 
Integer.toString(i));
+    String inputPrefix = 
Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX,
 Integer.toString(i));
     List<Pair<DatasetDescriptor, DatasetDescriptor>> result = 
Lists.newArrayList();
     while (config.hasPath(inputPrefix)) {
-      Config inputDescriptorConfig = config.getConfig(inputPrefix);
-      DatasetDescriptor inputDescriptor = 
getDatasetDescriptor(inputDescriptorConfig);
-      String outputPrefix = 
Joiner.on(".").join(OUTPUT_DATASET_DESCRIPTOR_PREFIX, Integer.toString(i++));
-      Config outputDescriptorConfig = config.getConfig(outputPrefix);
-      DatasetDescriptor outputDescriptor = 
getDatasetDescriptor(outputDescriptorConfig);
-      result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
-      inputPrefix = Joiner.on(".").join(INPUT_DATASET_DESCRIPTOR_PREFIX, 
Integer.toString(i));
+      try {
+        Config inputDescriptorConfig = config.getConfig(inputPrefix);
+        DatasetDescriptor inputDescriptor = 
getDatasetDescriptor(inputDescriptorConfig);
+        String outputPrefix = Joiner.on(".")
+            
.join(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX, 
Integer.toString(i));
+        Config outputDescriptorConfig = config.getConfig(outputPrefix);
+        DatasetDescriptor outputDescriptor = 
getDatasetDescriptor(outputDescriptorConfig);
+
+        if (isResolvable(userConfig, inputDescriptor, outputDescriptor)) {
+          result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
+        }
+      } catch (ReflectiveOperationException e) {
+        //Cannot instantiate I/O dataset descriptor due to missing config; 
skip and try the next one.
+      }
+      inputPrefix = 
Joiner.on(".").join(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX,
 Integer.toString(++i));
     }
     return result;
   }
 
   private DatasetDescriptor getDatasetDescriptor(Config descriptorConfig)
       throws ReflectiveOperationException {
-    Class datasetDescriptorClass = 
Class.forName(descriptorConfig.getString(DATASET_DESCRIPTOR_CLASS_KEY));
-    return (DatasetDescriptor) GobblinConstructorUtils
-        .invokeLongestConstructor(datasetDescriptorClass, descriptorConfig);
+    Class datasetDescriptorClass = 
Class.forName(descriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
+    return (DatasetDescriptor) 
GobblinConstructorUtils.invokeLongestConstructor(datasetDescriptorClass, 
descriptorConfig);
   }
 
   @Override
@@ -130,27 +137,53 @@ public class StaticFlowTemplate implements FlowTemplate {
     return this.rawConfig;
   }
 
-  private void ensureTemplateMaterialized()
-      throws IOException {
-    try {
-      if (!isTemplateMaterialized) {
-        this.dag = 
JobTemplateDagFactory.createDagFromJobTemplates(this.jobTemplates);
-      }
-      this.isTemplateMaterialized = true;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
   @Override
   public List<JobTemplate> getJobTemplates() {
     return this.jobTemplates;
   }
 
+  private Config getResolvedFlowConfig(Config userConfig) {
+    return userConfig.withFallback(this.rawConfig);
+  }
+
+  /**
+   * Checks if the {@link FlowTemplate} is resolvable using the provided 
{@link Config} object. A {@link FlowTemplate}
+   * is resolvable only if each of the {@link JobTemplate}s in the flow is 
resolvable
+   * @param userConfig User supplied Config
+   * @return true if the {@link FlowTemplate} is resolvable
+   */
   @Override
-  public Dag<JobTemplate> getDag()
-      throws IOException {
-    ensureTemplateMaterialized();
-    return this.dag;
+  public boolean isResolvable(Config userConfig, DatasetDescriptor 
inputDescriptor, DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException {
+    Config inputDescriptorConfig = 
inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    Config outputDescriptorConfig = 
outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    userConfig = 
userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
+
+    ConfigResolveOptions resolveOptions = 
ConfigResolveOptions.defaults().setAllowUnresolved(true);
+
+    for (JobTemplate template: this.jobTemplates) {
+      Config templateConfig = 
template.getResolvedConfig(userConfig).resolve(resolveOptions);
+      if 
(!template.getResolvedConfig(userConfig).resolve(resolveOptions).isResolved()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public List<Config> getResolvedJobConfigs(Config userConfig, 
DatasetDescriptor inputDescriptor,
+      DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException {
+    Config inputDescriptorConfig = 
inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
+    Config outputDescriptorConfig = 
outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
+    userConfig = 
userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
+
+    List<Config> resolvedJobConfigs = new ArrayList<>();
+    for (JobTemplate jobTemplate: getJobTemplates()) {
+      Config resolvedJobConfig = 
jobTemplate.getResolvedConfig(userConfig).resolve().withValue(
+          ConfigurationKeys.JOB_TEMPLATE_PATH, 
ConfigValueFactory.fromAnyRef(jobTemplate.getUri().toString()));;
+      resolvedJobConfigs.add(resolvedJobConfig);
+    }
+    return resolvedJobConfigs;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
index 5dba91c..59c3c87 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowCatalog.java
@@ -19,16 +19,23 @@ package org.apache.gobblin.service.modules.template_catalog;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 
+import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigResolveOptions;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -41,17 +48,22 @@ import 
org.apache.gobblin.service.modules.template.FlowTemplate;
 import 
org.apache.gobblin.service.modules.template.HOCONInputStreamFlowTemplate;
 import org.apache.gobblin.util.PathUtils;
 
+
 /**
  * An implementation of a catalog for {@link FlowTemplate}s. Provides basic 
API for retrieving a {@link FlowTemplate}
  * from the catalog and for retrieving {@link JobTemplate}s that are part of a 
{@link FlowTemplate}.
  * The flow and job configuration files are assumed to have the following path 
structure:
- * <p> /path/to/template/catalog/flowName/flow.(conf|pull) </p>
- * <p> /path/to/template/catalog/flowName/jobs/job1.(conf|pull) </p>
- * <p> /path/to/template/catalog/flowName/jobs/job2.(conf|pull) </p>
+ * <p> /path/to/template/catalog/flowName/flow.conf </p>
+ * <p> /path/to/template/catalog/flowName/jobs/job1.(job|template) </p>
+ * <p> /path/to/template/catalog/flowName/jobs/job2.(job|template) </p>
  */
 @Alpha
 public class FSFlowCatalog extends FSJobCatalog implements 
FlowCatalogWithTemplates {
-  public static final String JOB_TEMPLATE_DIR_NAME="jobs";
+  public static final String JOBS_DIR_NAME = "jobs";
+  public static final String FLOW_CONF_FILE_NAME = "flow.conf";
+  public static final List<String> JOB_FILE_EXTENSIONS = Arrays.asList(".job", 
".template");
+  public static final String JOB_TEMPLATE_KEY = "gobblin.template.uri";
+
   protected static final String FS_SCHEME = "FS";
 
   /**
@@ -59,64 +71,94 @@ public class FSFlowCatalog extends FSJobCatalog implements 
FlowCatalogWithTempla
    * @param sysConfig that must contain the fully qualified path of the flow 
template catalog
    * @throws IOException
    */
-  public FSFlowCatalog(Config sysConfig) throws IOException {
-    
super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, 
sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
+  public FSFlowCatalog(Config sysConfig)
+      throws IOException {
+    
super(sysConfig.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        
sysConfig.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)));
   }
 
   /**
    *
-   * @param flowUri URI of the flow configuration file
+   * @param flowTemplateDirURI URI of the flow template directory
    * @return a {@link FlowTemplate}
    * @throws SpecNotFoundException
    * @throws JobTemplate.TemplateException
    * @throws IOException
    */
-  public FlowTemplate getFlowTemplate(URI flowUri) throws 
SpecNotFoundException, JobTemplate.TemplateException, IOException {
+  public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
+      throws SpecNotFoundException, JobTemplate.TemplateException, 
IOException, URISyntaxException {
     if 
(!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))
 {
       throw new RuntimeException("Missing config " + 
ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
     }
-    if (!flowUri.getScheme().equals(FS_SCHEME)) {
-      throw new RuntimeException("Expected scheme " + FS_SCHEME + " got 
unsupported scheme " + flowUri.getScheme());
+    if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
+      throw new RuntimeException(
+          "Expected scheme " + FS_SCHEME + " got unsupported scheme " + 
flowTemplateDirURI.getScheme());
     }
     String templateCatalogDir = 
this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
     // path of uri is location of template file relative to the job 
configuration root directory
-    Path templateFullPath = PathUtils.mergePaths(new Path(templateCatalogDir), 
new Path(flowUri.getPath()));
+    Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), 
new Path(flowTemplateDirURI.getPath()));
+    Path templateFullPath = PathUtils.mergePaths(templateDirPath, new 
Path(FLOW_CONF_FILE_NAME));
     FileSystem fs = FileSystem.get(templateFullPath.toUri(), new 
Configuration());
 
     try (InputStream is = fs.open(templateFullPath)) {
-      return new HOCONInputStreamFlowTemplate(is, flowUri, this);
-    } catch (ReflectiveOperationException e) {
-      throw new RuntimeException(e);
+      return new HOCONInputStreamFlowTemplate(is, flowTemplateDirURI, this);
     }
   }
 
   /**
    *
-   * @param flowTemplateDirUri URI of the flow template directory
+   * @param flowTemplateDirURI URI of the flow template directory
    * @return a list of {@link JobTemplate}s for a given flow identified by its 
{@link URI}.
    * @throws IOException
    * @throws SpecNotFoundException
    * @throws JobTemplate.TemplateException
    */
-  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirUri)
-      throws IOException, SpecNotFoundException, JobTemplate.TemplateException 
{
+  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
+
+    PathFilter extensionFilter = file -> {
+      for (String extension : JOB_FILE_EXTENSIONS) {
+        if (file.getName().endsWith(extension)) {
+          return true;
+        }
+      }
+      return false;
+    };
+
     if 
(!this.sysConfig.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))
 {
       throw new RuntimeException("Missing config " + 
ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
     }
-    if (!flowTemplateDirUri.getScheme().equals(FS_SCHEME)) {
-      throw new RuntimeException("Expected scheme " + FS_SCHEME + " got 
unsupported scheme " + flowTemplateDirUri.getScheme());
+    if (!flowTemplateDirURI.getScheme().equals(FS_SCHEME)) {
+      throw new RuntimeException(
+          "Expected scheme " + FS_SCHEME + " got unsupported scheme " + 
flowTemplateDirURI.getScheme());
     }
     List<JobTemplate> jobTemplates = new ArrayList<>();
 
     String templateCatalogDir = 
this.sysConfig.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY);
-    Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), 
new Path(flowTemplateDirUri));
-    Path jobTemplatePath = new Path(templateDirPath, JOB_TEMPLATE_DIR_NAME);
+    Path templateDirPath = PathUtils.mergePaths(new Path(templateCatalogDir), 
new Path(flowTemplateDirURI));
+    Path jobTemplatePath = new Path(templateDirPath, JOBS_DIR_NAME);
     FileSystem fs = FileSystem.get(jobTemplatePath.toUri(), new 
Configuration());
-    for (FileStatus fileStatus : fs.listStatus(jobTemplatePath)) {
-      try (InputStream is = fs.open(fileStatus.getPath())) {
-        jobTemplates.add(new HOCONInputStreamJobTemplate(is, 
fileStatus.getPath().toUri(), this));
+
+    for (FileStatus fileStatus : fs.listStatus(jobTemplatePath, 
extensionFilter)) {
+      Config templateConfig = loadHoconFileAtPath(fileStatus.getPath(), true);
+      if (templateConfig.hasPath(JOB_TEMPLATE_KEY)) {
+        URI templateUri = new URI(templateConfig.getString(JOB_TEMPLATE_KEY));
+        //Strip out the initial "/"
+        URI actualResourceUri = new URI(templateUri.getPath().substring(1));
+        Path fullTemplatePath =
+            new 
Path(FSFlowCatalog.class.getClassLoader().getResource(actualResourceUri.getPath()).toURI());
+        templateConfig = 
templateConfig.withFallback(loadHoconFileAtPath(fullTemplatePath, true));
       }
+      jobTemplates.add(new HOCONInputStreamJobTemplate(templateConfig, 
fileStatus.getPath().toUri(), this));
     }
     return jobTemplates;
   }
+
+  private Config loadHoconFileAtPath(Path filePath, boolean allowUnresolved)
+      throws IOException {
+    ConfigResolveOptions options = 
ConfigResolveOptions.defaults().setAllowUnresolved(allowUnresolved);
+    try (InputStream is = fs.open(filePath)) {
+      return ConfigFactory.parseReader(new InputStreamReader(is, 
Charsets.UTF_8)).resolve(options);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
index 41c0c9e..8bafe97 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FlowCatalogWithTemplates.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.template_catalog;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 
 import org.apache.gobblin.runtime.api.JobTemplate;
@@ -34,16 +35,18 @@ public interface FlowCatalogWithTemplates {
    * Get {@link FlowTemplate} with given {@link URI}.
    * @throws SpecNotFoundException if a {@link JobTemplate} with given {@link 
URI} cannot be found.
    */
-  FlowTemplate getFlowTemplate(URI uri) throws SpecNotFoundException, 
IOException, JobTemplate.TemplateException;
+  FlowTemplate getFlowTemplate(URI uri)
+      throws SpecNotFoundException, IOException, 
JobTemplate.TemplateException, URISyntaxException;
 
   /**
    *
-   * @param flowUri
+   * @param flowTemplateDirURI URI of the flow template directory.
    * @return a list of {@link JobTemplate}s for a given flow identified by its 
{@link URI}.
    * @throws IOException
    * @throws SpecNotFoundException
    * @throws JobTemplate.TemplateException
    */
-  public List<JobTemplate> getJobTemplatesForFlow(URI flowUri) throws 
IOException, SpecNotFoundException, JobTemplate.TemplateException;
+  public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+      throws IOException, SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/22a951f0/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
new file mode 100644
index 0000000..b5451bc
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.dircache.DirCache;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.RepositoryCache;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.transport.RefSpec;
+import org.eclipse.jgit.util.FS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class GitFlowGraphMonitorTest {
+  private static final Logger logger = 
LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+  private Repository remoteRepo;
+  private Git gitForPush;
+  private static final String TEST_DIR = "/tmp/gitFlowGraphTestDir";
+  private final File remoteDir = new File(TEST_DIR + "/remote");
+  private final File cloneDir = new File(TEST_DIR + "/clone");
+  private final File flowGraphDir = new File(cloneDir, "/gobblin-flowgraph");
+  private static final String NODE_1_FILE = "node1.properties";
+  private final File node1Dir = new File(flowGraphDir, "node1");
+  private final File node1File = new File(node1Dir, NODE_1_FILE);
+  private static final String NODE_2_FILE = "node2.properties";
+  private final File node2Dir = new File(flowGraphDir, "node2");
+  private final File node2File = new File(node2Dir, NODE_2_FILE);
+  private final File edge1Dir = new File(node1Dir, "node2");
+  private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+  private RefSpec masterRefSpec = new RefSpec("master");
+  private FSFlowCatalog flowCatalog;
+  private Config config;
+  private BaseFlowGraph flowGraph;
+  private GitFlowGraphMonitor gitFlowGraphMonitor;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(TEST_DIR);
+
+    // Create a bare repository
+    RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(remoteDir, 
FS.DETECTED);
+    this.remoteRepo = fileKey.open(false);
+    this.remoteRepo.create(true);
+
+    this.gitForPush = 
Git.cloneRepository().setURI(this.remoteRepo.getDirectory().getAbsolutePath()).setDirectory(cloneDir).call();
+
+    // push an empty commit as a base for detecting changes
+    this.gitForPush.commit().setMessage("First commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.config = ConfigBuilder.create()
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "."
+            + ConfigurationKeys.GIT_MONITOR_REPO_URI, 
this.remoteRepo.getDirectory().getAbsolutePath())
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/git-flowgraph")
+        .addPrimitive(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5)
+        .build();
+
+    // Create a FSFlowCatalog instance
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    this.flowCatalog = new FSFlowCatalog(templateCatalogCfg);
+
+    //Create a FlowGraph instance with defaults
+    this.flowGraph = new BaseFlowGraph();
+
+    this.gitFlowGraphMonitor = new GitFlowGraphMonitor(this.config, 
this.flowCatalog, this.flowGraph);
+    this.gitFlowGraphMonitor.setActive(true);
+  }
+
+  private void testAddNodeHelper(File nodeDir, File nodeFile, String nodeId, 
String paramValue)
+      throws IOException, GitAPIException {
+    // push a new node file
+    nodeDir.mkdirs();
+    nodeFile.createNewFile();
+    Files.write(FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY + 
"=true\nparam1=" + paramValue + "\n", nodeFile, Charsets.UTF_8);
+
+    // add, commit, push node
+    this.gitForPush.add().addFilepattern(formNodeFilePath(nodeDir.getName(), 
nodeFile.getName())).call();
+    this.gitForPush.commit().setMessage("Node commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if node1 has been added to the FlowGraph
+    DataNode dataNode = this.flowGraph.getNode(nodeId);
+    Assert.assertEquals(dataNode.getId(), nodeId);
+    Assert.assertTrue(dataNode.isActive());
+    Assert.assertEquals(dataNode.getRawConfig().getString("param1"), 
paramValue);
+  }
+
+  @Test
+  public void testAddNode()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value1");
+    testAddNodeHelper(this.node2Dir, this.node2File, "node2", "value2");
+  }
+
+  @Test (dependsOnMethods = "testAddNode")
+  public void testAddEdge()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    // push a new node file
+    this.edge1Dir.mkdirs();
+    this.edge1File.createNewFile();
+
+    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + 
"=FS:///flowEdgeTemplate\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specStore.fs.dir=/tmp1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specExecInstance.capabilities=s1:d1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specStore.fs.dir=/tmp2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specExecInstance.capabilities=s2:d2\n", edge1File, Charsets.UTF_8);
+
+    // add, commit, push
+    
this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Edge commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if edge1 has been added to the FlowGraph
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getSrc(), "node1");
+    Assert.assertEquals(flowEdge.getDest(), "node2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),
 "s1:d1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),
 "/tmp2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),
 "s2:d2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+  }
+
+  @Test (dependsOnMethods = "testAddNode")
+  public void testUpdateEdge()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    //Update edge1 file
+    Files.write(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY + "=node1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY + "=node2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY + "=edge1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY + "=true\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_TEMPLATE_DIR_URI_KEY + 
"=FS:///flowEdgeTemplate\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".0."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specStore.fs.dir=/tmp1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".0.specExecInstance.capabilities=s1:d1\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + ".1."
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY + 
"=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specStore.fs.dir=/tmp2\n"
+        + FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + 
".1.specExecInstance.capabilities=s2:d2\n"
+        + "key1=value1\n", edge1File, Charsets.UTF_8);
+
+    // add, commit, push
+    
this.gitForPush.add().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
 this.edge1Dir.getName(), this.edge1File.getName())).call();
+    this.gitForPush.commit().setMessage("Edge commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if new edge1 has been added to the FlowGraph
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+    FlowEdge flowEdge = edgeSet.iterator().next();
+    Assert.assertEquals(flowEdge.getSrc(), "node1");
+    Assert.assertEquals(flowEdge.getDest(), "node2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specStore.fs.dir"),
 "/tmp1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getConfig().get().getString("specExecInstance.capabilities"),
 "s1:d1");
+    
Assert.assertEquals(flowEdge.getExecutors().get(0).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specStore.fs.dir"),
 "/tmp2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getConfig().get().getString("specExecInstance.capabilities"),
 "s2:d2");
+    
Assert.assertEquals(flowEdge.getExecutors().get(1).getClass().getSimpleName(), 
"InMemorySpecExecutor");
+    Assert.assertEquals(flowEdge.getConfig().getString("key1"), "value1");
+  }
+
+  @Test (dependsOnMethods = "testUpdateEdge")
+  public void testUpdateNode()
+      throws IOException, GitAPIException, URISyntaxException, 
ExecutionException, InterruptedException {
+    //Update param1 value in node1 and check if updated node is added to the 
graph
+    testAddNodeHelper(this.node1Dir, this.node1File, "node1", "value3");
+  }
+
+
+  @Test (dependsOnMethods = "testUpdateNode")
+  public void testRemoveEdge() throws GitAPIException, IOException {
+    // delete a config file
+    edge1File.delete();
+
+    //Node1 has 1 edge before delete
+    Set<FlowEdge> edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertEquals(edgeSet.size(), 1);
+
+    // delete, commit, push
+    DirCache ac = 
this.gitForPush.rm().addFilepattern(formEdgeFilePath(this.edge1Dir.getParentFile().getName(),
+        this.edge1Dir.getName(), this.edge1File.getName())).call();
+    RevCommit cc = this.gitForPush.commit().setMessage("Edge remove 
commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if edge1 has been deleted from the graph
+    edgeSet = this.flowGraph.getEdges("node1");
+    Assert.assertTrue(edgeSet.size() == 0);
+  }
+
+  @Test (dependsOnMethods = "testRemoveEdge")
+  public void testRemoveNode() throws GitAPIException, IOException {
+    //delete node file
+    node1File.delete();
+
+    //node1 is present in the graph before delete
+    DataNode node1 = this.flowGraph.getNode("node1");
+    Assert.assertNotNull(node1);
+
+    // delete, commit, push
+    DirCache ac = 
this.gitForPush.rm().addFilepattern(formNodeFilePath(this.node1Dir.getName(), 
this.node1File.getName())).call();
+    RevCommit cc = this.gitForPush.commit().setMessage("Node remove 
commit").call();
+    
this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call();
+
+    this.gitFlowGraphMonitor.processGitConfigChanges();
+
+    //Check if node1 has been deleted from the graph
+    node1 = this.flowGraph.getNode("node1");
+    Assert.assertNull(node1);
+  }
+
+
+  private void cleanUpDir(String dir) {
+    File specStoreDir = new File(dir);
+
+    // cleanup is flaky on Travis, so retry a few times and then suppress the 
error if unsuccessful
+    for (int i = 0; i < 5; i++) {
+      try {
+        if (specStoreDir.exists()) {
+          FileUtils.deleteDirectory(specStoreDir);
+        }
+        // if delete succeeded then break out of loop
+        break;
+      } catch (IOException e) {
+        logger.warn("Cleanup delete directory failed for directory: " + dir, 
e);
+      }
+    }
+  }
+
+  private String formNodeFilePath(String groupDir, String fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + groupDir 
+ SystemUtils.FILE_SEPARATOR + fileName;
+  }
+
+  private String formEdgeFilePath(String parentDir, String groupDir, String 
fileName) {
+    return this.flowGraphDir.getName() + SystemUtils.FILE_SEPARATOR + 
parentDir + SystemUtils.FILE_SEPARATOR + groupDir + SystemUtils.FILE_SEPARATOR 
+ fileName;
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+    cleanUpDir(TEST_DIR);
+  }
+}
\ No newline at end of file

Reply via email to