[GOBBLIN-624] Handle dataset retention in multi-hop flow compiler.

Closes #2493 from sv2000/retentionConfig


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/49974214
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/49974214
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/49974214

Branch: refs/heads/master
Commit: 49974214a088b60681938a73d40b2124b18cd2bc
Parents: 602cee7
Author: suvasude <[email protected]>
Authored: Sun Nov 4 18:04:26 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Sun Nov 4 18:04:26 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   2 +
 .../modules/dataset/DatasetDescriptor.java      |   5 +
 .../modules/dataset/EncryptionConfig.java       |  28 ++-
 .../modules/dataset/FSDatasetDescriptor.java    |  24 ++-
 .../service/modules/dataset/FormatConfig.java   |  11 ++
 .../service/modules/flow/FlowGraphPath.java     |  39 ++---
 .../modules/flow/MultiHopFlowCompiler.java      |   3 +-
 .../gobblin/service/modules/flowgraph/Dag.java  |  84 +++++++--
 .../flowgraph/DatasetDescriptorConfigKeys.java  |   3 +-
 .../pathfinder/AbstractPathFinder.java          | 159 +++++++++++++----
 .../service/modules/spec/JobExecutionPlan.java  |  10 +-
 .../service/modules/flow/FlowGraphPathTest.java |   4 -
 .../modules/flow/MultiHopFlowCompilerTest.java  | 171 +++++++++++++------
 .../service/modules/flowgraph/DagTest.java      |   4 +
 .../adls-1-to-adls-1-retention-1.properties     |   9 +
 .../adls-1-to-adls-1-retention-2.properties     |   9 +
 .../hdfs-1-to-hdfs-1-retention.properties       |   9 +
 .../hdfs-2-hdfs-2-retention.properties          |   9 +
 .../hdfs-3-to-hdfs-3-retention.properties       |   9 +
 .../hdfs-4-to-hdfs-4-retention.properties       |   9 +
 .../local-to-local-retention.properties         |   9 +
 .../hdfsConvertToJsonAndEncrypt/flow.conf       |   1 +
 .../hdfsSnapshotRetention/flow.conf             |  35 ++++
 .../jobs/hdfs-snapshot-retention.job            |  15 ++
 .../flowEdgeTemplates/hdfsToAdl/flow.conf       |   1 +
 .../flowEdgeTemplates/hdfsToHdfs/flow.conf      |   2 +
 .../flowEdgeTemplates/localToHdfs/flow.conf     |   1 +
 .../distcp-push-hdfs-to-adl.template            |   2 +-
 .../multihop/jobTemplates/distcp.template       |   2 +-
 .../hdfs-convert-to-json-and-encrypt.template   |   2 +-
 .../jobTemplates/hdfs-retention.template        |  13 ++
 31 files changed, 538 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index ffc2376..8383c2a 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -133,6 +133,8 @@ public class ConfigurationKeys {
   public static final String FLOW_DESCRIPTION_KEY = "flow.description";
   public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId";
   public static final String FLOW_FAILURE_OPTION = "flow.failureOption";
+  public static final String FLOW_APPLY_RETENTION = "flow.applyRetention";
+  public static final String FLOW_APPLY_INPUT_RETENTION = 
"flow.applyInputRetention";
 
   /**
    * Common topology configuration properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index e8474e3..33c563c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -52,6 +52,11 @@ public interface DatasetDescriptor {
   public FormatConfig getFormatConfig();
 
   /**
+   * @return true if retention has been applied to the dataset.
+   */
+  public boolean isRetentionApplied();
+
+  /**
    * @return a human-readable description of the dataset.
    */
   public String getDescription();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
index 21c7c17..52a1a10 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -18,10 +18,15 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.core.GitMonitoringService;
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -33,14 +38,29 @@ public class EncryptionConfig {
   private final String keystoreType;
   @Getter
   private final String keystoreEncoding;
+  @Getter
+  private final Config rawConfig;
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .build());
 
   public EncryptionConfig(Config encryptionConfig) {
     this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
         DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
-    this.keystoreType = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
-        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
-    this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
-        DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    if 
(this.encryptionAlgorithm.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE))
 {
+      this.keystoreType = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+      this.keystoreEncoding = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+    } else {
+      this.keystoreType = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
+          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+      this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
+          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    }
+    this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK);
   }
 
   public boolean contains(EncryptionConfig other) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index a5cb717..e9f4e31 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -22,7 +22,9 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
 import lombok.Getter;
 
@@ -44,18 +46,27 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
   @Getter
   private final FormatConfig formatConfig;
   @Getter
+  private final boolean isRetentionApplied;
+  @Getter
   private final String description;
   @Getter
   private final Config rawConfig;
 
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(DatasetDescriptorConfigKeys.PATH_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false)
+          .build());
+
   public FSDatasetDescriptor(Config config) {
     
Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY),
 "Dataset descriptor config must specify platform");
     this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
     this.path = PathUtils.getPathWithoutSchemeAndAuthority(new 
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
         
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
     this.formatConfig = new FormatConfig(config);
+    this.isRetentionApplied = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
     this.description = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
-    this.rawConfig = config;
+    this.rawConfig = 
config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
   }
 
   /**
@@ -66,7 +77,7 @@ public class FSDatasetDescriptor implements DatasetDescriptor 
{
    * @param otherPath a glob pattern that describes a set of paths.
    * @return true if the glob pattern described by the otherPath matches the 
path in this {@link DatasetDescriptor}.
    */
-  public boolean isPathContaining(String otherPath) {
+  private boolean isPathContaining(String otherPath) {
     if (otherPath == null) {
       return false;
     }
@@ -97,12 +108,16 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
       return false;
     }
 
+    if (this.isRetentionApplied() != other.isRetentionApplied()) {
+      return false;
+    }
+
     return getFormatConfig().contains(other.getFormatConfig()) && 
isPathContaining(other.getPath());
   }
 
   /**
    *
-   * @param o
+   * @param o the other {@link FSDatasetDescriptor} to compare "this" {@link 
FSDatasetDescriptor} with.
    * @return true iff  "this" dataset descriptor is compatible with the 
"other" and the "other" dataset descriptor is
    * compatible with this dataset descriptor.
    */
@@ -118,6 +133,9 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
     if (this.getPlatform() == null || other.getPlatform() == null || 
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
       return false;
     }
+    if (this.isRetentionApplied() != other.isRetentionApplied()) {
+      return false;
+    }
     return this.getPath().equals(other.getPath()) && 
this.getFormatConfig().equals(other.getFormatConfig());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
index a36182c..4b45a52 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -45,12 +46,22 @@ public class FormatConfig {
   private final String codecType;
   @Getter
   private final EncryptionConfig encryptionConfig;
+  @Getter
+  private final Config rawConfig;
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(DatasetDescriptorConfigKeys.FORMAT_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.CODEC_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .build());
 
   public FormatConfig(Config config) {
     this.format = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.FORMAT_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
     this.codecType = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.CODEC_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
     this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, 
DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
         .empty()));
+    this.rawConfig = 
config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
+        withFallback(DEFAULT_FALLBACK);
   }
 
   public boolean contains(FormatConfig other) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
index 02004b6..84be3b4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -21,18 +21,17 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
@@ -100,31 +99,20 @@ public class FlowGraphPath {
    */
   @VisibleForTesting
   static Dag<JobExecutionPlan> concatenate(Dag<JobExecutionPlan> dagLeft, 
Dag<JobExecutionPlan> dagRight) {
-    List<DagNode<JobExecutionPlan>> endNodes = dagLeft.getEndNodes();
-    List<DagNode<JobExecutionPlan>> startNodes = dagRight.getStartNodes();
-    List<String> dependenciesList = Lists.newArrayList();
-    //List of nodes with no dependents in the concatenated dag.
-    Set<DagNode<JobExecutionPlan>> forkNodes = new HashSet<>();
-
-    for (DagNode<JobExecutionPlan> dagNode: endNodes) {
-      if (isNodeForkable(dagNode)) {
-        //If node is forkable, then add its parents (if any) to the 
dependencies list.
-        forkNodes.add(dagNode);
-        List<DagNode<JobExecutionPlan>> parentNodes = 
dagLeft.getParents(dagNode);
-        for (DagNode<JobExecutionPlan> parentNode: parentNodes) {
-          
dependenciesList.add(parentNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
-        }
-      } else {
-        
dependenciesList.add(dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
-      }
-    }
-
-    if (!dependenciesList.isEmpty()) {
+    //Compute the fork nodes - set of nodes with no dependents in the 
concatenated dag.
+    Set<DagNode<JobExecutionPlan>> forkNodes = dagLeft.getEndNodes().stream().
+        filter(endNode -> isNodeForkable(endNode)).collect(Collectors.toSet());
+    Set<DagNode<JobExecutionPlan>> dependencyNodes = 
dagLeft.getDependencyNodes(forkNodes);
+
+    if (!dependencyNodes.isEmpty()) {
+      List<String> dependenciesList = dependencyNodes.stream()
+          .map(dagNode -> 
dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY))
+          .collect(Collectors.toList());
       String dependencies = Joiner.on(",").join(dependenciesList);
-
-      for (DagNode<JobExecutionPlan> childNode : startNodes) {
+      for (DagNode<JobExecutionPlan> childNode : dagRight.getStartNodes()) {
         JobSpec jobSpec = childNode.getValue().getJobSpec();
-        
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES,
 ConfigValueFactory.fromAnyRef(dependencies)));
+        
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES,
+            ConfigValueFactory.fromAnyRef(dependencies)));
       }
     }
 
@@ -136,6 +124,7 @@ public class FlowGraphPath {
     return ConfigUtils.getBoolean(jobConfig, 
ConfigurationKeys.JOB_FORK_ON_CONCAT, false);
   }
 
+
   /**
    * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < 
JobExecutionPlan >} that moves data
    * from the source of the {@link FlowEdge} to the destination of the {@link 
FlowEdge}.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 52116a2..32418e4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -101,7 +101,7 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     try {
       this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
     } catch (TimeoutException te) {
-      this.log.error("Timed out while waiting for the service manager to start 
up", te);
+      MultiHopFlowCompiler.log.error("Timed out while waiting for the service 
manager to start up", te);
       throw new RuntimeException(te);
     }
   }
@@ -163,7 +163,6 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
   @Override
   protected void populateEdgeTemplateMap() {
     log.warn("No population of templates based on edge happen in this 
implementation");
-    return;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
index 0f5691e..db81c6a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java
@@ -21,11 +21,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import lombok.Getter;
 
@@ -86,7 +88,66 @@ public class Dag<T> {
   }
 
   public List<DagNode<T>> getParents(DagNode node) {
-    return (node.parentNodes != null)? node.parentNodes : 
Collections.EMPTY_LIST;
+    return (node.parentNodes != null) ? node.parentNodes : 
Collections.EMPTY_LIST;
+  }
+
+  /**
+   * Get the ancestors of a given set of {@link DagNode}s in the {@link Dag}.
+   * @param dagNodes set of nodes in the {@link Dag}.
+   * @return the union of all ancestors of dagNodes in the dag.
+   */
+  private Set<DagNode<T>> getAncestorNodes(Set<DagNode<T>> dagNodes) {
+    Set<DagNode<T>> ancestorNodes = new HashSet<>();
+    for (DagNode<T> dagNode : dagNodes) {
+      LinkedList<DagNode<T>> nodesToExpand = 
Lists.newLinkedList(this.getParents(dagNode));
+      while (!nodesToExpand.isEmpty()) {
+        DagNode<T> nextNode = nodesToExpand.poll();
+        ancestorNodes.add(nextNode);
+        nodesToExpand.addAll(this.getParents(nextNode));
+      }
+    }
+    return ancestorNodes;
+  }
+
+  /**
+   * This method computes a set of {@link DagNode}s which are the dependency 
nodes for concatenating this {@link Dag}
+   * with any other {@link Dag}. The set of dependency nodes is the union of:
+   * <p><ul>
+   *   <li> The endNodes of this dag which are not forkable, and </li>
+   *   <li> The parents of forkable nodes, such that no parent is an ancestor 
of another parent.</li>
+   * </ul></p>
+   *
+   * @param forkNodes set of nodes of this {@link Dag} which are forkable
+   * @return set of dependency nodes of this dag for concatenation with any 
other dag.
+   */
+  public Set<DagNode<T>> getDependencyNodes(Set<DagNode<T>> forkNodes) {
+    Set<DagNode<T>> dependencyNodes = new HashSet<>();
+    for (DagNode<T> endNode : endNodes) {
+      if (!forkNodes.contains(endNode)) {
+        dependencyNodes.add(endNode);
+      }
+    }
+
+    //Get all ancestors of non-forkable nodes
+    Set<DagNode<T>> ancestorNodes = this.getAncestorNodes(dependencyNodes);
+
+    //Add ancestors of the parents of forkable nodes
+    for (DagNode<T> dagNode: forkNodes) {
+      List<DagNode<T>> parentNodes = this.getParents(dagNode);
+      
ancestorNodes.addAll(this.getAncestorNodes(Sets.newHashSet(parentNodes)));
+    }
+
+    for (DagNode<T> dagNode: forkNodes) {
+      List<DagNode<T>> parentNodes = this.getParents(dagNode);
+      for (DagNode<T> parentNode : parentNodes) {
+        //Add parent node of a forkable node as a dependency, only if it is 
not already an ancestor of another
+        // dependency.
+        if (!ancestorNodes.contains(parentNode)) {
+          dependencyNodes.add(parentNode);
+        }
+      }
+    }
+    return dependencyNodes;
   }
 
   public boolean isEmpty() {
@@ -126,20 +187,13 @@ public class Dag<T> {
       return other;
     }
 
-    for (DagNode node : this.endNodes) {
-      //Create a dependency for non-forkable nodes
-      if (!forkNodes.contains(node)) {
+    for (DagNode node : getDependencyNodes(forkNodes)) {
+      if (!this.parentChildMap.containsKey(node)) {
         this.parentChildMap.put(node, Lists.newArrayList());
-        for (DagNode otherNode : other.startNodes) {
-          this.parentChildMap.get(node).add(otherNode);
-          otherNode.addParentNode(node);
-        }
-      } else {
-        for (DagNode otherNode: other.startNodes) {
-          List<DagNode<T>> parentNodes = this.getParents(node);
-          parentNodes.forEach(parentNode -> 
this.parentChildMap.get(parentNode).add(otherNode));
-          parentNodes.forEach(otherNode::addParentNode);
-        }
+      }
+      for (DagNode otherNode : other.startNodes) {
+        this.parentChildMap.get(node).add(otherNode);
+        otherNode.addParentNode(node);
       }
     }
     //Each node which is a forkable node is added to list of end nodes of the 
concatenated dag
@@ -174,7 +228,7 @@ public class Dag<T> {
       return other;
     }
     //Append all the entries from the other dag's parentChildMap to this dag's 
parentChildMap
-    for (Map.Entry<DagNode, List<DagNode<T>>> entry: 
other.parentChildMap.entrySet()) {
+    for (Map.Entry<DagNode, List<DagNode<T>>> entry : 
other.parentChildMap.entrySet()) {
       this.parentChildMap.put(entry.getKey(), entry.getValue());
     }
     //Append the startNodes, endNodes and nodes from the other dag to this dag.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index 23e20c8..6470a1d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -34,6 +34,7 @@ public class DatasetDescriptorConfigKeys {
   public static final String FORMAT_KEY = "format";
   public static final String CODEC_KEY = "codec";
   public static final String DESCRIPTION_KEY = "description";
+  public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied";
 
   //Dataset encryption related keys
   public static final String ENCYPTION_PREFIX = "encrypt";
@@ -42,5 +43,5 @@ public class DatasetDescriptorConfigKeys {
   public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = 
"keystore_encoding";
 
   public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any";
-
+  public static final String DATASET_DESCRIPTOR_CONFIG_NONE = "none";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index eed5603..f58a6d8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -19,19 +19,25 @@ package 
org.apache.gobblin.service.modules.flowgraph.pathfinder;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
 import org.apache.gobblin.runtime.api.SpecExecutor;
@@ -57,23 +63,24 @@ public abstract class AbstractPathFinder implements 
PathFinder {
   private static final String SOURCE_PREFIX = "source";
   private static final String DESTINATION_PREFIX = "destination";
 
-  protected FlowGraph flowGraph;
-  protected FlowSpec flowSpec;
-  protected Config flowConfig;
+  private List<DataNode> destNodes;
+
+  FlowGraph flowGraph;
 
-  protected DataNode srcNode;
-  protected List<DataNode> destNodes;
+  DataNode srcNode;
 
-  protected DatasetDescriptor srcDatasetDescriptor;
-  protected DatasetDescriptor destDatasetDescriptor;
+  DatasetDescriptor srcDatasetDescriptor;
+  DatasetDescriptor destDatasetDescriptor;
 
   //Maintain path of FlowEdges as parent-child map
-  protected Map<FlowEdgeContext, FlowEdgeContext> pathMap;
+  Map<FlowEdgeContext, FlowEdgeContext> pathMap;
 
   //Flow Execution Id
   protected Long flowExecutionId;
+  protected FlowSpec flowSpec;
+  protected Config flowConfig;
 
-  public AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
+  AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec)
       throws ReflectiveOperationException {
     this.flowGraph = flowGraph;
     this.flowSpec = flowSpec;
@@ -94,12 +101,48 @@ public abstract class AbstractPathFinder implements 
PathFinder {
       }
       this.destNodes.add(destNode);
     }
+
+    //Should apply retention?
+    boolean shouldApplyRetention = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_APPLY_RETENTION, true);
+    //Should apply retention on input dataset?
+    boolean shouldApplyRetentionOnInput = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_APPLY_INPUT_RETENTION, false);
+
+    if ((shouldApplyRetentionOnInput) && (!shouldApplyRetention)) {
+      //Invalid retention config
+      throw new RuntimeException("Invalid retention configuration - 
shouldApplyRetentionOnInput = " + shouldApplyRetentionOnInput +
+          ", and shouldApplyRetention = " + shouldApplyRetention);
+    }
+
     //Get src/dest dataset descriptors from the flow config
     Config srcDatasetDescriptorConfig =
         
flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config destDatasetDescriptorConfig =
         
flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
 
+    //Add retention config for source and destination dataset descriptors.
+    if (shouldApplyRetentionOnInput) {
+      // We should run retention on source dataset. To ensure a retention is 
run, set
+      // isRetentionApplied=false for source dataset.
+      srcDatasetDescriptorConfig = srcDatasetDescriptorConfig
+          .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, 
ConfigValueFactory.fromAnyRef(false));
+    } else {
+      // Don't apply retention on source dataset.
+      //
+      // If ConfigurationKeys.FLOW_APPLY_RETENTION is true, isRetentionApplied 
is set to true for the source dataset.
+      // The PathFinder will therefore treat the source dataset as one on 
which retention has already been
+      // applied, preventing retention from running on the source dataset.
+      //
+      // On the other hand, if ConfigurationKeys.FLOW_APPLY_RETENTION is false
+      // we do not apply retention - neither on the source dataset nor 
anywhere along the path to the destination.
+      srcDatasetDescriptorConfig = srcDatasetDescriptorConfig
+          .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, 
ConfigValueFactory.fromAnyRef(shouldApplyRetention));
+    }
+    destDatasetDescriptorConfig = 
destDatasetDescriptorConfig.withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY,
 ConfigValueFactory.fromAnyRef(shouldApplyRetention));
+
+    //Add the retention configs to the FlowConfig
+    flowConfig = flowConfig.withValue(ConfigurationKeys.FLOW_APPLY_RETENTION, 
ConfigValueFactory.fromAnyRef(shouldApplyRetention));
+    flowConfig = 
flowConfig.withValue(ConfigurationKeys.FLOW_APPLY_INPUT_RETENTION, 
ConfigValueFactory.fromAnyRef(shouldApplyRetentionOnInput));
+
     Class srcdatasetDescriptorClass =
         
Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
     this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
@@ -108,25 +151,23 @@ public abstract class AbstractPathFinder implements 
PathFinder {
         
Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
     this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
         .invokeLongestConstructor(destDatasetDescriptorClass, 
destDatasetDescriptorConfig);
+
   }
 
-  protected boolean isPathFound(DataNode currentNode, DataNode destNode, 
DatasetDescriptor currentDatasetDescriptor,
+  boolean isPathFound(DataNode currentNode, DataNode destNode, 
DatasetDescriptor currentDatasetDescriptor,
       DatasetDescriptor destDatasetDescriptor) {
-    if ((currentNode.equals(destNode)) && 
(currentDatasetDescriptor.equals(destDatasetDescriptor))) {
-      return true;
-    }
-    return false;
+    return (currentNode.equals(destNode)) && 
(currentDatasetDescriptor.equals(destDatasetDescriptor));
   }
 
   /**
    * A helper method that sorts the {@link FlowEdge}s incident on srcNode 
based on whether the FlowEdge has an
    * output {@link DatasetDescriptor} that is compatible with the 
targetDatasetDescriptor.
-   * @param dataNode
+   * @param dataNode the {@link DataNode} to be expanded for determining 
candidate edges.
    * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the 
current edge.
    * @param destDatasetDescriptor Target {@link DatasetDescriptor}.
    * @return prioritized list of {@link FlowEdge}s to be added to the edge 
queue for expansion.
    */
-  protected List<FlowEdgeContext> getNextEdges(DataNode dataNode, 
DatasetDescriptor currentDatasetDescriptor,
+  List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor 
currentDatasetDescriptor,
       DatasetDescriptor destDatasetDescriptor) {
     List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
     for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
@@ -146,25 +187,18 @@ public abstract class AbstractPathFinder implements 
PathFinder {
           for (Pair<DatasetDescriptor, DatasetDescriptor> 
datasetDescriptorPair : datasetDescriptorPairs) {
             DatasetDescriptor inputDatasetDescriptor = 
datasetDescriptorPair.getLeft();
             DatasetDescriptor outputDatasetDescriptor = 
datasetDescriptorPair.getRight();
+
             if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
-              FlowEdgeContext flowEdgeContext;
-              if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) {
-                //If datasets described by the currentDatasetDescriptor is a 
subset of the datasets described
-                // by the outputDatasetDescriptor (i.e. 
currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g.
-                // as in the case of a "distcp" edge), we propagate the more 
"specific" dataset descriptor forward.
-                flowEdgeContext =
-                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, 
currentDatasetDescriptor, mergedConfig,
-                        specExecutor);
-              } else {
-                //outputDatasetDescriptor is more specific (e.g. if it is a 
dataset transformation edge)
-                flowEdgeContext =
-                    new FlowEdgeContext(flowEdge, currentDatasetDescriptor, 
outputDatasetDescriptor, mergedConfig,
-                        specExecutor);
-              }
+              DatasetDescriptor edgeOutputDescriptor = 
makeOutputDescriptorSpecific(currentDatasetDescriptor, outputDatasetDescriptor);
+              FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge, 
currentDatasetDescriptor, edgeOutputDescriptor, mergedConfig,
+                  specExecutor);
+
               if 
(destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig()))
 {
-                //Add to the front of the edge list if platform-independent 
properties of the output descriptor is compatible
-                // with those of destination dataset descriptor.
-                // In other words, we prioritize edges that perform data 
transformations as close to the source as possible.
+                /*
+                Add to the front of the edge list if platform-independent 
properties of the output descriptor is compatible
+                with those of destination dataset descriptor.
+                In other words, we prioritize edges that perform data 
transformations as close to the source as possible.
+                */
                 prioritizedEdgeList.add(0, flowEdgeContext);
               } else {
                 prioritizedEdgeList.add(flowEdgeContext);
@@ -188,6 +222,65 @@ public abstract class AbstractPathFinder implements 
PathFinder {
   }
 
   /**
+   * A helper method to make the output {@link DatasetDescriptor} of a {@link 
FlowEdge} "specific". More precisely,
+   * we replace any "placeholder" configurations in the output {@link 
DatasetDescriptor} with specific configuration
+   * values obtained from the input {@link DatasetDescriptor}. A placeholder 
configuration is one which is not
+   * defined or is set to {@link 
DatasetDescriptorConfigKeys#DATASET_DESCRIPTOR_CONFIG_ANY}.
+   *
+   * Example: Consider a {@link FlowEdge} that applies retention on an input 
dataset. Further assume that this edge
+   * is applicable to datasets of all formats. The input and output 
descriptors of this edge may be described using the following
+   * configs:
+   * inputDescriptor = 
Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor",
+   * 
"codec":"any","encrypt":{"algorithm":"any","keystore_encoding":"any","keystore_type":"any"},"format":"any",
+   * 
"isRetentionApplied":false,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"}))
+   *
+   * outputDescriptor = 
Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor",
+   * 
"codec":"any","encrypt":{"algorithm":"any","keystore_encoding":"any","keystore_type":"any"},"format":"any",
+   * 
"isRetentionApplied":true,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"}))
+   *
+   * Let the intermediate dataset descriptor "arriving" at this edge be 
described using the following config:
+   * currentDescriptor = 
Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor",
+   * 
"codec":"gzip","encrypt":{"algorithm":"aes_rotating","keystore_encoding":"base64","keystore_type":"json"},"format":"json",
+   * 
"isRetentionApplied":false,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"})).
+   *
+   * This method replaces the placeholder configs in outputDescriptor with 
specific values from currentDescriptor to return:
+   * returnedDescriptor = 
Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor",
+   * 
"codec":"gzip","encrypt":{"algorithm":"aes_rotating","keystore_encoding":"base64","keystore_type":"json"},"format":"json",
+   * 
"isRetentionApplied":<b>true</b>,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"})).
+   *
+   * @param currentDescriptor intermediate {@link DatasetDescriptor} obtained 
during path finding.
+   * @param outputDescriptor output {@link DatasetDescriptor} of a {@link 
FlowEdge}.
+   * @return {@link DatasetDescriptor} with placeholder configs in 
outputDescriptor substituted with specific values
+   * from the currentDescriptor.
+   */
+
+  private DatasetDescriptor makeOutputDescriptorSpecific(DatasetDescriptor 
currentDescriptor, DatasetDescriptor outputDescriptor)
+      throws ReflectiveOperationException {
+    Config config = outputDescriptor.getRawConfig();
+
+    for (Iterator<Map.Entry<String, ConfigValue>> iterator = 
currentDescriptor.getRawConfig().entrySet().iterator();
+        iterator.hasNext(); ) {
+      Map.Entry<String, ConfigValue> entry = iterator.next();
+      String entryValue = entry.getValue().unwrapped().toString();
+      if (!isPlaceHolder(entryValue)) {
+        String entryValueInOutputDescriptor = ConfigUtils.getString(config, 
entry.getKey(), StringUtils.EMPTY);
+        if (isPlaceHolder(entryValueInOutputDescriptor)) {
+          config = config.withValue(entry.getKey(), 
ConfigValueFactory.fromAnyRef(entryValue));
+        }
+      }
+    }
+    return 
GobblinConstructorUtils.invokeLongestConstructor(outputDescriptor.getClass(), 
config);
+  }
+
+  /**
+   * A placeholder configuration is one which is not defined or is set to 
{@link DatasetDescriptorConfigKeys#DATASET_DESCRIPTOR_CONFIG_ANY}.
+   * @param value to be examined for determining if it is a placeholder.
+   * @return true if the value is null or empty or equals {@link 
DatasetDescriptorConfigKeys#DATASET_DESCRIPTOR_CONFIG_ANY}.
+   */
+  private boolean isPlaceHolder(String value) {
+    return Strings.isNullOrEmpty(value) || 
value.equals(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+  }
+  /**
    * Build the merged config for each {@link FlowEdge}, which is a combination 
of (in the precedence described below):
    * <ul>
    *   <p> the user provided flow config </p>

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 5e9f86c..7517c17 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -35,6 +35,7 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -71,11 +72,14 @@ public class JobExecutionPlan {
 
       String flowName = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
       String flowGroup = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_GROUP_KEY, "");
-      String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
       String flowFailureOption = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION);
 
-      //Modify the job name to include the flow group, flow name and a 
randomly generated integer to make the job name unique.
-      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, random.nextInt(Integer.MAX_VALUE));
+      String jobName = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.JOB_NAME_KEY, "");
+      String source = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, "");
+      String destination = ConfigUtils.getString(jobConfig, 
FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, "");
+
+      //Modify the job name to include the flow group, flow name and source 
and destination node ids for the job.
+      jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, 
flowName, jobName, source, destination);
 
       JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, 
flowSpec)).withConfig(jobConfig)
           
.withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java
index 01074f8..bbd863b 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java
@@ -127,9 +127,5 @@ public class FlowGraphPathTest {
     
Assert.assertEquals(dagNew.getStartNodes().get(1).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY),
 "job1");
     
Assert.assertFalse(dagNew.getStartNodes().get(1).getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES));
     
Assert.assertFalse(dagNew.getStartNodes().get(1).getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES));
-
-
   }
-
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index c04b2b9..54792f9 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -24,8 +24,11 @@ import java.io.InputStreamReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -49,6 +52,7 @@ import org.testng.annotations.Test;
 import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -69,6 +73,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
 import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
 import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
@@ -141,7 +146,8 @@ public class MultiHopFlowCompilerTest {
     this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph);
   }
 
-  private FlowSpec createFlowSpec(String flowConfigResource, String source, 
String destination) throws IOException, URISyntaxException {
+  private FlowSpec createFlowSpec(String flowConfigResource, String source, 
String destination, boolean applyRetention, boolean applyRetentionOnInput)
+      throws IOException, URISyntaxException {
     //Create a flow spec
     Properties flowProperties = new Properties();
     flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *");
@@ -149,6 +155,8 @@ public class MultiHopFlowCompilerTest {
     flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName");
     flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source);
     flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
destination);
+    flowProperties.put(ConfigurationKeys.FLOW_APPLY_RETENTION, 
Boolean.toString(applyRetention));
+    flowProperties.put(ConfigurationKeys.FLOW_APPLY_INPUT_RETENTION, 
Boolean.toString(applyRetentionOnInput));
     Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties);
 
     //Get the input/output dataset config from a file
@@ -172,14 +180,14 @@ public class MultiHopFlowCompilerTest {
 
   @Test
   public void testCompileFlow() throws URISyntaxException, IOException {
-    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1");
+    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", 
false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
     Assert.assertEquals(jobDag.getNodes().size(), 4);
     Assert.assertEquals(jobDag.getStartNodes().size(), 1);
     Assert.assertEquals(jobDag.getEndNodes().size(), 1);
 
     //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1"
-    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
     JobExecutionPlan jobSpecWithExecutor = startNode.getValue();
     JobSpec jobSpec = jobSpecWithExecutor.getJobSpec();
 
@@ -188,9 +196,9 @@ public class MultiHopFlowCompilerTest {
     String flowGroup = "testFlowGroup";
     String flowName = "testFlowName";
     String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+        join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1");
     String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName1.startsWith(expectedJobName1));
+    Assert.assertEquals(jobName1, expectedJobName1);
     String from = jobConfig.getString("from");
     String to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -212,15 +220,15 @@ public class MultiHopFlowCompilerTest {
     Assert.assertEquals(specExecutor.getUri().toString(), "fs:///");
     Assert.assertEquals(specExecutor.getClass().getCanonicalName(), 
"org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor");
 
-    //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt"
+    //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt". 
Ensure config has correct substitutions.
     Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> secondHopNode = 
jobDag.getChildren(startNode).get(0);
+    DagNode<JobExecutionPlan> secondHopNode = 
jobDag.getChildren(startNode).get(0);
     jobSpecWithExecutor = secondHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
     String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "convert-to-json-and-encrypt");
+        join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1", 
"HDFS-1");
     String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName2.startsWith(expectedJobName2));
+    Assert.assertEquals(jobName2, expectedJobName2);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -234,13 +242,13 @@ public class MultiHopFlowCompilerTest {
 
     //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3"
     Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> thirdHopNode = 
jobDag.getChildren(secondHopNode).get(0);
+    DagNode<JobExecutionPlan> thirdHopNode = 
jobDag.getChildren(secondHopNode).get(0);
     jobSpecWithExecutor = thirdHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
     String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+        join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3");
     String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName3.startsWith(expectedJobName3));
+    Assert.assertEquals(jobName3, expectedJobName3);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -256,15 +264,15 @@ public class MultiHopFlowCompilerTest {
     Assert.assertEquals(specExecutor.getUri().toString(), 
"https://azkaban01.gobblin.net:8443";);
     Assert.assertEquals(specExecutor.getClass().getCanonicalName(), 
"org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
 
-    //Get the 4th hop - "Distcp from HDFS3 to ADLS-1"
+    //Get the 4th hop - "Distcp from HDFS-3 to ADLS-1"
     Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> fourthHopNode = 
jobDag.getChildren(thirdHopNode).get(0);
+    DagNode<JobExecutionPlan> fourthHopNode = 
jobDag.getChildren(thirdHopNode).get(0);
     jobSpecWithExecutor = fourthHopNode.getValue();
     jobConfig = jobSpecWithExecutor.getJobSpec().getConfig();
     String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp-HDFS-ADL");
+        join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1");
     String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName4.startsWith(expectedJobName4));
+    Assert.assertEquals(jobName4, expectedJobName4);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -287,12 +295,62 @@ public class MultiHopFlowCompilerTest {
     Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode);
   }
 
+
   @Test (dependsOnMethods = "testCompileFlow")
+  public void testCompileFlowWithRetention() throws URISyntaxException, 
IOException {
+    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", 
true,
+        true);
+    Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+    Assert.assertEquals(jobDag.getNodes().size(), 9);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 2);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 5);
+
+    String flowGroup = "testFlowGroup";
+    String flowName = "testFlowName";
+
+    List<DagNode<JobExecutionPlan>> currentHopNodes = jobDag.getStartNodes();
+
+    List<String> expectedJobNames = Lists.newArrayList("SnapshotRetention", 
"Distcp", "SnapshotRetention", "ConvertToJsonAndEncrypt", "SnapshotRetention" ,
+        "Distcp", "SnapshotRetention", "DistcpToADL", "SnapshotRetention");
+    List<String> sourceNodes = Lists.newArrayList("LocalFS-1", "LocalFS-1", 
"HDFS-1", "HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1");
+    List<String> destinationNodes = Lists.newArrayList("LocalFS-1", "HDFS-1", 
"HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1", "ADLS-1");
+
+    List<DagNode<JobExecutionPlan>> nextHopNodes = new ArrayList<>();
+    for (int i = 0; i < 9; i += 2) {
+      if (i < 8) {
+        Assert.assertEquals(currentHopNodes.size(), 2);
+      } else {
+        Assert.assertEquals(currentHopNodes.size(), 1);
+      }
+      Set<String> jobNames = new HashSet<>();
+      
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+          join(flowGroup, flowName, expectedJobNames.get(i), 
sourceNodes.get(i), destinationNodes.get(i)));
+      if (i < 8) {
+        
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+            join(flowGroup, flowName, expectedJobNames.get(i + 1), 
sourceNodes.get(i + 1), destinationNodes.get(i + 1)));
+      }
+
+      for (DagNode<JobExecutionPlan> dagNode : currentHopNodes) {
+        Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+        String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+        Assert.assertTrue(jobNames.contains(jobName));
+        log.warn(jobName);
+        nextHopNodes.addAll(jobDag.getChildren(dagNode));
+      }
+
+      currentHopNodes = nextHopNodes;
+      nextHopNodes = new ArrayList<>();
+    }
+    Assert.assertEquals(nextHopNodes.size(), 0);
+
+  }
+
+  @Test (dependsOnMethods = "testCompileFlowWithRetention")
   public void testCompileFlowAfterFirstEdgeDeletion() throws 
URISyntaxException, IOException {
     //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt.
     this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt");
 
-    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1");
+    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", 
false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
 
     Assert.assertEquals(jobDag.getNodes().size(), 4);
@@ -300,7 +358,7 @@ public class MultiHopFlowCompilerTest {
     Assert.assertEquals(jobDag.getEndNodes().size(), 1);
 
     //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2"
-    Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
+    DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0);
     JobExecutionPlan jobExecutionPlan = startNode.getValue();
     JobSpec jobSpec = jobExecutionPlan.getJobSpec();
 
@@ -309,9 +367,9 @@ public class MultiHopFlowCompilerTest {
     String flowGroup = "testFlowGroup";
     String flowName = "testFlowName";
     String expectedJobName1 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+        join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2");
     String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName1.startsWith(expectedJobName1));
+    Assert.assertEquals(jobName1, expectedJobName1);
     String from = jobConfig.getString("from");
     String to = jobConfig.getString("to");
     Assert.assertEquals(from, "/data/out/testTeam/testDataset");
@@ -335,13 +393,13 @@ public class MultiHopFlowCompilerTest {
 
     //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt"
     Assert.assertEquals(jobDag.getChildren(startNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> secondHopNode = 
jobDag.getChildren(startNode).get(0);
+    DagNode<JobExecutionPlan> secondHopNode = 
jobDag.getChildren(startNode).get(0);
     jobExecutionPlan = secondHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
     String expectedJobName2 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "convert-to-json-and-encrypt");
+        join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2", 
"HDFS-2");
     String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName2.startsWith(expectedJobName2));
+    Assert.assertEquals(jobName2, expectedJobName2);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName1);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -355,13 +413,13 @@ public class MultiHopFlowCompilerTest {
 
     //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4"
     Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> thirdHopNode = 
jobDag.getChildren(secondHopNode).get(0);
+    DagNode<JobExecutionPlan> thirdHopNode = 
jobDag.getChildren(secondHopNode).get(0);
     jobExecutionPlan = thirdHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
     String expectedJobName3 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp-HDFS-HDFS");
+        join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4");
     String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName3.startsWith(expectedJobName3));
+    Assert.assertEquals(jobName3, expectedJobName3);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName2);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -377,16 +435,16 @@ public class MultiHopFlowCompilerTest {
     Assert.assertEquals(specExecutor.getUri().toString(), 
"https://azkaban02.gobblin.net:8443";);
     Assert.assertEquals(specExecutor.getClass().getCanonicalName(), 
"org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor");
 
-    //Get the 4th hop - "Distcp from HDFS4 to ADLS-1"
+    //Get the 4th hop - "Distcp from HDFS-4 to ADLS-1"
     Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1);
-    Dag.DagNode<JobExecutionPlan> fourthHopNode = 
jobDag.getChildren(thirdHopNode).get(0);
+    DagNode<JobExecutionPlan> fourthHopNode = 
jobDag.getChildren(thirdHopNode).get(0);
     jobExecutionPlan = fourthHopNode.getValue();
     jobConfig = jobExecutionPlan.getJobSpec().getConfig();
 
     String expectedJobName4 = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
-        join(flowGroup, flowName, "Distcp-HDFS-ADL");
+        join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1");
     String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
-    Assert.assertTrue(jobName4.startsWith(expectedJobName4));
+    Assert.assertEquals(jobName4, expectedJobName4);
     
Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), 
jobName3);
     from = jobConfig.getString("from");
     to = jobConfig.getString("to");
@@ -414,7 +472,7 @@ public class MultiHopFlowCompilerTest {
     //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt.
     this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt");
 
-    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1");
+    FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", 
false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
 
     //Ensure no path to destination.
@@ -423,50 +481,57 @@ public class MultiHopFlowCompilerTest {
 
   @Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion")
   public void testCompileFlowSingleHop() throws IOException, 
URISyntaxException {
-    FlowSpec spec = createFlowSpec("flow/flow2.conf", "HDFS-1", "HDFS-3");
+    FlowSpec spec = createFlowSpec("flow/flow2.conf", "HDFS-1", "HDFS-3", 
false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
     Assert.assertEquals(jobDag.getNodes().size(), 1);
     Assert.assertEquals(jobDag.getStartNodes().size(), 1);
     Assert.assertEquals(jobDag.getEndNodes().size(), 1);
     Assert.assertEquals(jobDag.getStartNodes().get(0), 
jobDag.getEndNodes().get(0));
 
-    //Ensure hop is from HDFS-1 to HDFS-3.
-    Dag.DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0);
-    JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-    Config jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-    Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), 
"hdfs://hadoopnn01.grid.linkedin.com:8888/");
-    Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), 
"hdfs://hadoopnn03.grid.linkedin.com:8888/");
+    //Ensure hop is from HDFS-1 to HDFS-3 i.e. jobName == 
"testFlowGroup_testFlowName_Distcp_HDFS-1_HDFS-3".
+    DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0);
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String expectedJobName = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3");
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertEquals(jobName, expectedJobName);
   }
 
 
   @Test (dependsOnMethods = "testCompileFlowSingleHop")
   public void testMulticastPath() throws IOException, URISyntaxException {
-    FlowSpec spec = createFlowSpec("flow/flow2.conf", "LocalFS-1", 
"HDFS-3,HDFS-4");
+    FlowSpec spec = createFlowSpec("flow/flow2.conf", "LocalFS-1", 
"HDFS-3,HDFS-4", false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
 
     Assert.assertEquals(jobDag.getNodes().size(), 4);
     Assert.assertEquals(jobDag.getEndNodes().size(), 2);
     Assert.assertEquals(jobDag.getStartNodes().size(), 2);
 
-    int i = 1;
     //First hop must be from LocalFS to HDFS-1 and HDFS-2
-    for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
-      JobExecutionPlan jobExecutionPlan = dagNode.getValue();
-      Config jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-      Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), 
"file:///");
-      Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), 
"hdfs://hadoopnn0" + i++ + ".grid.linkedin.com:8888/");
+    Set<String> jobNames = new HashSet<>();
+    
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", 
"HDFS-1"));
+    
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", 
"HDFS-2"));
+
+    for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
+      Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+      String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+      Assert.assertTrue(jobNames.contains(jobName));
     }
 
-    i = 1;
     //Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively.
-    for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
-      List<Dag.DagNode<JobExecutionPlan>> nextNodes = 
jobDag.getChildren(dagNode);
+    jobNames = new HashSet<>();
+    
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3"));
+    
jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-2", "HDFS-4"));
+    for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) {
+      List<DagNode<JobExecutionPlan>> nextNodes = jobDag.getChildren(dagNode);
       Assert.assertEquals(nextNodes.size(), 1);
-      JobExecutionPlan jobExecutionPlan = nextNodes.get(0).getValue();
-      Config jobConfig = jobExecutionPlan.getJobSpec().getConfig();
-      Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), 
"hdfs://hadoopnn0" + i + ".grid.linkedin.com:8888/");
-      Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), 
"hdfs://hadoopnn0" + (i + 2) + ".grid.linkedin.com:8888/");
-      i += 1;
+      Config jobConfig = nextNodes.get(0).getValue().getJobSpec().getConfig();
+      String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+      Assert.assertTrue(jobNames.contains(jobName));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
index 3a7ab10..dc21413 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java
@@ -160,6 +160,10 @@ public class DagTest {
     forkNodes.add(dagNode3);
     Dag<String> dagNew = dag1.concatenate(dag2, forkNodes);
 
+    Assert.assertEquals(dagNew.getChildren(dagNode2).size(), 1);
+    Assert.assertEquals(dagNew.getChildren(dagNode2).get(0), dagNode4);
+    Assert.assertEquals(dagNew.getParents(dagNode4).size(), 1);
+    Assert.assertEquals(dagNew.getParents(dagNode4).get(0), dagNode2);
     Assert.assertEquals(dagNew.getEndNodes().size(), 2);
     Assert.assertEquals(dagNew.getEndNodes().get(0).getValue(), "val4");
     Assert.assertEquals(dagNew.getEndNodes().get(1).getValue(), "val3");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
new file mode 100644
index 0000000..52079d1
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties
@@ -0,0 +1,9 @@
+flow.edge.source=ADLS-1
+flow.edge.destination=ADLS-1
+flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
new file mode 100644
index 0000000..7b1a160
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties
@@ -0,0 +1,9 @@
+flow.edge.source=ADLS-1
+flow.edge.destination=ADLS-1
+flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
new file mode 100644
index 0000000..926c51e
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=HDFS-1
+flow.edge.id=HDFS-1:HDFS-1:hdfsRetention
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
new file mode 100644
index 0000000..26454e7
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-2
+flow.edge.destination=HDFS-2
+flow.edge.id=HDFS-2:HDFS-2:hdfsRetention
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
new file mode 100644
index 0000000..f390546
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-3
+flow.edge.destination=HDFS-3
+flow.edge.id=HDFS-3:HDFS-3:hdfsRetention
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
new file mode 100644
index 0000000..6afb3d8
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties
@@ -0,0 +1,9 @@
+flow.edge.source=HDFS-4
+flow.edge.destination=HDFS-4
+flow.edge.id=HDFS-4:HDFS-4:hdfsRetention
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE
+flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
new file mode 100644
index 0000000..76c77fd
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties
@@ -0,0 +1,9 @@
+flow.edge.source=LocalFS-1
+flow.edge.destination=LocalFS-1
+flow.edge.id=LocalFS-1:LocalFS-1:localRetention
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention
+flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor
+flow.edge.specExecutors.0.specExecInstance.uri=fs:///
+flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher
+flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL
+flow.edge.specExecutors.0.specExecInstance.job.type=java
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
index 0a53e5b..2ae6fb6 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
@@ -2,6 +2,7 @@ 
gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.mo
 gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
 
gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
 gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention}
 #############################################################
 # Define input dataset to be uncompressed and unencrypted
 #############################################################

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
new file mode 100644
index 0000000..8560ebf
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
@@ -0,0 +1,35 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
+gobblin.flow.edge.output.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.edge.output.dataset.descriptor.0.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
+gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}
+gobblin.flow.edge.output.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.edge.output.dataset.descriptor.1.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.2.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.2.platform=adls
+gobblin.flow.edge.input.dataset.descriptor.2.path=/data/out/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.2.class=${gobblin.flow.edge.input.dataset.descriptor.2.class}
+gobblin.flow.edge.output.dataset.descriptor.2.platform=${gobblin.flow.edge.input.dataset.descriptor.2.platform}
+gobblin.flow.edge.output.dataset.descriptor.2.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.edge.output.dataset.descriptor.2.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.3.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.3.platform=adls
+gobblin.flow.edge.input.dataset.descriptor.3.path=/data/encrypted/${team.name}/${dataset.name}
+
+gobblin.flow.edge.output.dataset.descriptor.3.class=${gobblin.flow.edge.input.dataset.descriptor.3.class}
+gobblin.flow.edge.output.dataset.descriptor.3.platform=${gobblin.flow.edge.input.dataset.descriptor.3.platform}
+gobblin.flow.edge.output.dataset.descriptor.3.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.edge.output.dataset.descriptor.3.isRetentionApplied=true

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
new file mode 100644
index 0000000..46c43c2
--- /dev/null
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job
@@ -0,0 +1,15 @@
+gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-retention.template"
+#Should Retention job run in parallel with the next job?
+job.forkOnConcat=true
+#Dataset version finder class
+gobblin.retention.version.finder.class="org.apache.gobblin.data.management.version.finder.DateTimeDatasetVersionFinder"
+#Retention Policy
+gobblin.retention.selection.policy.class="org.apache.gobblin.data.management.policy.CombineSelectionPolicy"
+gobblin.retention.selection.combine.policy.classes="[org.apache.gobblin.data.management.retention.policy.TimeBasedRetentionPolicy,org.apache.gobblin.data.management.retention.policy.NewestKRetentionPolicy]"
+#Num versions to keep
+gobblin.retention.selection.newestK.versionsNotSelected=7
+#Maximum lookback time
+gobblin.retention.selection.timeBased.lookbackTime=7d
+gobblin.retention.selection.combine.operation="INTERSECT"
+gobblin.retention.version.globPattern="snapshot/*"
+gobblin.retention.version.datetime.pattern="yyyy-MM-dd_HH_mm_ss"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
index 2cbf420..2fc7522 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf
@@ -6,6 +6,7 @@ gobblin.flow.edge.input.dataset.descriptor.0.codec=gzip
 gobblin.flow.edge.input.dataset.descriptor.0.encrypt.algorithm=aes_rotating
 gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_type=json
 gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_encoding=base64
+gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention}
 
 
gobblin.flow.edge.output.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
 gobblin.flow.edge.output.dataset.descriptor.0.platform=adls

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
index abac6b5..4563780 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
@@ -1,6 +1,7 @@
 
gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
 gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
 
gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention}
 
 
gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
 
gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
@@ -9,6 +10,7 @@ 
gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dat
 
gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
 gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs
 
gobblin.flow.edge.input.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.1.isRetentionApplied=${flow.applyRetention}
 
 
gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class}
 
gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
index d0765e1..f4deb46 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf
@@ -2,6 +2,7 @@ 
gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.mo
 gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
 
gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name}
 gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention}
 
 
gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
 
gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}

Reply via email to