[GOBBLIN-624] Handle dataset retention in multi-hop flow compiler. Closes #2493 from sv2000/retentionConfig
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/49974214 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/49974214 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/49974214 Branch: refs/heads/master Commit: 49974214a088b60681938a73d40b2124b18cd2bc Parents: 602cee7 Author: suvasude <[email protected]> Authored: Sun Nov 4 18:04:26 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Sun Nov 4 18:04:26 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 2 + .../modules/dataset/DatasetDescriptor.java | 5 + .../modules/dataset/EncryptionConfig.java | 28 ++- .../modules/dataset/FSDatasetDescriptor.java | 24 ++- .../service/modules/dataset/FormatConfig.java | 11 ++ .../service/modules/flow/FlowGraphPath.java | 39 ++--- .../modules/flow/MultiHopFlowCompiler.java | 3 +- .../gobblin/service/modules/flowgraph/Dag.java | 84 +++++++-- .../flowgraph/DatasetDescriptorConfigKeys.java | 3 +- .../pathfinder/AbstractPathFinder.java | 159 +++++++++++++---- .../service/modules/spec/JobExecutionPlan.java | 10 +- .../service/modules/flow/FlowGraphPathTest.java | 4 - .../modules/flow/MultiHopFlowCompilerTest.java | 171 +++++++++++++------ .../service/modules/flowgraph/DagTest.java | 4 + .../adls-1-to-adls-1-retention-1.properties | 9 + .../adls-1-to-adls-1-retention-2.properties | 9 + .../hdfs-1-to-hdfs-1-retention.properties | 9 + .../hdfs-2-hdfs-2-retention.properties | 9 + .../hdfs-3-to-hdfs-3-retention.properties | 9 + .../hdfs-4-to-hdfs-4-retention.properties | 9 + .../local-to-local-retention.properties | 9 + .../hdfsConvertToJsonAndEncrypt/flow.conf | 1 + .../hdfsSnapshotRetention/flow.conf | 35 ++++ .../jobs/hdfs-snapshot-retention.job | 15 ++ .../flowEdgeTemplates/hdfsToAdl/flow.conf | 1 + .../flowEdgeTemplates/hdfsToHdfs/flow.conf | 2 + .../flowEdgeTemplates/localToHdfs/flow.conf | 1 + .../distcp-push-hdfs-to-adl.template | 2 +- .../multihop/jobTemplates/distcp.template | 2 +- .../hdfs-convert-to-json-and-encrypt.template | 2 +- .../jobTemplates/hdfs-retention.template | 13 ++ 31 files changed, 538 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index ffc2376..8383c2a 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -133,6 +133,8 @@ public class ConfigurationKeys { public static final String FLOW_DESCRIPTION_KEY = "flow.description"; public static final String FLOW_EXECUTION_ID_KEY = "flow.executionId"; public static final String FLOW_FAILURE_OPTION = "flow.failureOption"; + public static final String FLOW_APPLY_RETENTION = "flow.applyRetention"; + public static final String FLOW_APPLY_INPUT_RETENTION = "flow.applyInputRetention"; /** * Common topology configuration properties. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java index e8474e3..33c563c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java @@ -52,6 +52,11 @@ public interface DatasetDescriptor { public FormatConfig getFormatConfig(); /** + * @return true if retention has been applied to the dataset. + */ + public boolean isRetentionApplied(); + + /** * @return a human-readable description of the dataset. */ public String getDescription(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java index 21c7c17..52a1a10 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java @@ -18,10 +18,15 @@ package org.apache.gobblin.service.modules.dataset; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; import lombok.Getter; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.service.modules.core.GitMonitoringService; import org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys; import org.apache.gobblin.util.ConfigUtils; @@ -33,14 +38,29 @@ public class EncryptionConfig { private final String keystoreType; @Getter private final String keystoreEncoding; + @Getter + private final Config rawConfig; + + private static final Config DEFAULT_FALLBACK = + ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY) + .put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY) + .put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY) + .build()); public EncryptionConfig(Config encryptionConfig) { this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); - this.keystoreType = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, - DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); - this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, - DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + if (this.encryptionAlgorithm.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE)) { + this.keystoreType = DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE; + this.keystoreEncoding = DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE; + } else { + this.keystoreType = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, + DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, + DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + } + this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK); } public boolean contains(EncryptionConfig other) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java index a5cb717..e9f4e31 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java @@ -22,7 +22,9 @@ import org.apache.hadoop.fs.Path; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import lombok.Getter; @@ -44,18 +46,27 @@ public class FSDatasetDescriptor implements DatasetDescriptor { @Getter private final FormatConfig formatConfig; @Getter + private final boolean isRetentionApplied; + @Getter private final String description; @Getter private final Config rawConfig; + private static final Config DEFAULT_FALLBACK = + ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(DatasetDescriptorConfigKeys.PATH_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY) + .put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false) + .build()); + public FSDatasetDescriptor(Config config) { Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY), "Dataset descriptor config must specify platform"); this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY); this.path = PathUtils.getPathWithoutSchemeAndAuthority(new Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString(); this.formatConfig = new FormatConfig(config); + this.isRetentionApplied = ConfigUtils.getBoolean(config, DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false); this.description = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.DESCRIPTION_KEY, ""); - this.rawConfig = config; + this.rawConfig = config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK); } /** @@ -66,7 +77,7 @@ public class FSDatasetDescriptor implements DatasetDescriptor { * @param otherPath a glob pattern that describes a set of paths. * @return true if the glob pattern described by the otherPath matches the path in this {@link DatasetDescriptor}. */ - public boolean isPathContaining(String otherPath) { + private boolean isPathContaining(String otherPath) { if (otherPath == null) { return false; } @@ -97,12 +108,16 @@ public class FSDatasetDescriptor implements DatasetDescriptor { return false; } + if (this.isRetentionApplied() != other.isRetentionApplied()) { + return false; + } + return getFormatConfig().contains(other.getFormatConfig()) && isPathContaining(other.getPath()); } /** * - * @param o + * @param o the other {@link FSDatasetDescriptor} to compare "this" {@link FSDatasetDescriptor} with. * @return true iff "this" dataset descriptor is compatible with the "other" and the "other" dataset descriptor is * compatible with this dataset descriptor. */ @@ -118,6 +133,9 @@ public class FSDatasetDescriptor implements DatasetDescriptor { if (this.getPlatform() == null || other.getPlatform() == null || !this.getPlatform().equalsIgnoreCase(other.getPlatform())) { return false; } + if (this.isRetentionApplied() != other.isRetentionApplied()) { + return false; + } return this.getPath().equals(other.getPath()) && this.getFormatConfig().equals(other.getFormatConfig()); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java index a36182c..4b45a52 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java @@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.dataset; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -45,12 +46,22 @@ public class FormatConfig { private final String codecType; @Getter private final EncryptionConfig encryptionConfig; + @Getter + private final Config rawConfig; + + private static final Config DEFAULT_FALLBACK = + ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(DatasetDescriptorConfigKeys.FORMAT_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY) + .put(DatasetDescriptorConfigKeys.CODEC_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY) + .build()); public FormatConfig(Config config) { this.format = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.FORMAT_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); this.codecType = ConfigUtils.getString(config, DatasetDescriptorConfigKeys.CODEC_KEY, DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory .empty())); + this.rawConfig = config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)). + withFallback(DEFAULT_FALLBACK); } public boolean contains(FormatConfig other) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java index 02004b6..84be3b4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java @@ -21,18 +21,17 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; @@ -100,31 +99,20 @@ public class FlowGraphPath { */ @VisibleForTesting static Dag<JobExecutionPlan> concatenate(Dag<JobExecutionPlan> dagLeft, Dag<JobExecutionPlan> dagRight) { - List<DagNode<JobExecutionPlan>> endNodes = dagLeft.getEndNodes(); - List<DagNode<JobExecutionPlan>> startNodes = dagRight.getStartNodes(); - List<String> dependenciesList = Lists.newArrayList(); - //List of nodes with no dependents in the concatenated dag. - Set<DagNode<JobExecutionPlan>> forkNodes = new HashSet<>(); - - for (DagNode<JobExecutionPlan> dagNode: endNodes) { - if (isNodeForkable(dagNode)) { - //If node is forkable, then add its parents (if any) to the dependencies list. - forkNodes.add(dagNode); - List<DagNode<JobExecutionPlan>> parentNodes = dagLeft.getParents(dagNode); - for (DagNode<JobExecutionPlan> parentNode: parentNodes) { - dependenciesList.add(parentNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY)); - } - } else { - dependenciesList.add(dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY)); - } - } - - if (!dependenciesList.isEmpty()) { + //Compute the fork nodes - set of nodes with no dependents in the concatenated dag. + Set<DagNode<JobExecutionPlan>> forkNodes = dagLeft.getEndNodes().stream(). + filter(endNode -> isNodeForkable(endNode)).collect(Collectors.toSet()); + Set<DagNode<JobExecutionPlan>> dependencyNodes = dagLeft.getDependencyNodes(forkNodes); + + if (!dependencyNodes.isEmpty()) { + List<String> dependenciesList = dependencyNodes.stream() + .map(dagNode -> dagNode.getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY)) + .collect(Collectors.toList()); String dependencies = Joiner.on(",").join(dependenciesList); - - for (DagNode<JobExecutionPlan> childNode : startNodes) { + for (DagNode<JobExecutionPlan> childNode : dagRight.getStartNodes()) { JobSpec jobSpec = childNode.getValue().getJobSpec(); - jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef(dependencies))); + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.JOB_DEPENDENCIES, + ConfigValueFactory.fromAnyRef(dependencies))); } } @@ -136,6 +124,7 @@ public class FlowGraphPath { return ConfigUtils.getBoolean(jobConfig, ConfigurationKeys.JOB_FORK_ON_CONCAT, false); } + /** * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 52116a2..32418e4 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -101,7 +101,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { try { this.serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS); } catch (TimeoutException te) { - this.log.error("Timed out while waiting for the service manager to start up", te); + MultiHopFlowCompiler.log.error("Timed out while waiting for the service manager to start up", te); throw new RuntimeException(te); } } @@ -163,7 +163,6 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { @Override protected void populateEdgeTemplateMap() { log.warn("No population of templates based on edge happen in this implementation"); - return; } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 0f5691e..db81c6a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -21,11 +21,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import lombok.Getter; @@ -86,7 +88,66 @@ public class Dag<T> { } public List<DagNode<T>> getParents(DagNode node) { - return (node.parentNodes != null)? node.parentNodes : Collections.EMPTY_LIST; + return (node.parentNodes != null) ? node.parentNodes : Collections.EMPTY_LIST; + } + + /** + * Get the ancestors of a given set of {@link DagNode}s in the {@link Dag}. + * @param dagNodes set of nodes in the {@link Dag}. + * @return the union of all ancestors of dagNodes in the dag. + */ + private Set<DagNode<T>> getAncestorNodes(Set<DagNode<T>> dagNodes) { + Set<DagNode<T>> ancestorNodes = new HashSet<>(); + for (DagNode<T> dagNode : dagNodes) { + LinkedList<DagNode<T>> nodesToExpand = Lists.newLinkedList(this.getParents(dagNode)); + while (!nodesToExpand.isEmpty()) { + DagNode<T> nextNode = nodesToExpand.poll(); + ancestorNodes.add(nextNode); + nodesToExpand.addAll(this.getParents(nextNode)); + } + } + return ancestorNodes; + } + + /** + * This method computes a set of {@link DagNode}s which are the dependency nodes for concatenating this {@link Dag} + * with any other {@link Dag}. The set of dependency nodes is the union of: + * <p><ul> + * <li> The endNodes of this dag which are not forkable, and </li> + * <li> The parents of forkable nodes, such that no parent is an ancestor of another parent.</li> + * </ul></p> + * + * @param forkNodes set of nodes of this {@link Dag} which are forkable + * @return set of dependency nodes of this dag for concatenation with any other dag. + */ + public Set<DagNode<T>> getDependencyNodes(Set<DagNode<T>> forkNodes) { + Set<DagNode<T>> dependencyNodes = new HashSet<>(); + for (DagNode<T> endNode : endNodes) { + if (!forkNodes.contains(endNode)) { + dependencyNodes.add(endNode); + } + } + + //Get all ancestors of non-forkable nodes + Set<DagNode<T>> ancestorNodes = this.getAncestorNodes(dependencyNodes); + + //Add ancestors of the parents of forkable nodes + for (DagNode<T> dagNode: forkNodes) { + List<DagNode<T>> parentNodes = this.getParents(dagNode); + ancestorNodes.addAll(this.getAncestorNodes(Sets.newHashSet(parentNodes))); + } + + for (DagNode<T> dagNode: forkNodes) { + List<DagNode<T>> parentNodes = this.getParents(dagNode); + for (DagNode<T> parentNode : parentNodes) { + //Add parent node of a forkable node as a dependency, only if it is not already an ancestor of another + // dependency. + if (!ancestorNodes.contains(parentNode)) { + dependencyNodes.add(parentNode); + } + } + } + return dependencyNodes; } public boolean isEmpty() { @@ -126,20 +187,13 @@ public class Dag<T> { return other; } - for (DagNode node : this.endNodes) { - //Create a dependency for non-forkable nodes - if (!forkNodes.contains(node)) { + for (DagNode node : getDependencyNodes(forkNodes)) { + if (!this.parentChildMap.containsKey(node)) { this.parentChildMap.put(node, Lists.newArrayList()); - for (DagNode otherNode : other.startNodes) { - this.parentChildMap.get(node).add(otherNode); - otherNode.addParentNode(node); - } - } else { - for (DagNode otherNode: other.startNodes) { - List<DagNode<T>> parentNodes = this.getParents(node); - parentNodes.forEach(parentNode -> this.parentChildMap.get(parentNode).add(otherNode)); - parentNodes.forEach(otherNode::addParentNode); - } + } + for (DagNode otherNode : other.startNodes) { + this.parentChildMap.get(node).add(otherNode); + otherNode.addParentNode(node); } } //Each node which is a forkable node is added to list of end nodes of the concatenated dag @@ -174,7 +228,7 @@ public class Dag<T> { return other; } //Append all the entries from the other dag's parentChildMap to this dag's parentChildMap - for (Map.Entry<DagNode, List<DagNode<T>>> entry: other.parentChildMap.entrySet()) { + for (Map.Entry<DagNode, List<DagNode<T>>> entry : other.parentChildMap.entrySet()) { this.parentChildMap.put(entry.getKey(), entry.getValue()); } //Append the startNodes, endNodes and nodes from the other dag to this dag. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java index 23e20c8..6470a1d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java @@ -34,6 +34,7 @@ public class DatasetDescriptorConfigKeys { public static final String FORMAT_KEY = "format"; public static final String CODEC_KEY = "codec"; public static final String DESCRIPTION_KEY = "description"; + public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied"; //Dataset encryption related keys public static final String ENCYPTION_PREFIX = "encrypt"; @@ -42,5 +43,5 @@ public class DatasetDescriptorConfigKeys { public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = "keystore_encoding"; public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any"; - + public static final String DATASET_DESCRIPTOR_CONFIG_NONE = "none"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java index eed5603..f58a6d8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java @@ -19,19 +19,25 @@ package org.apache.gobblin.service.modules.flowgraph.pathfinder; import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.typesafe.config.Config; +import com.typesafe.config.ConfigValue; +import com.typesafe.config.ConfigValueFactory; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.SpecExecutor; @@ -57,23 +63,24 @@ public abstract class AbstractPathFinder implements PathFinder { private static final String SOURCE_PREFIX = "source"; private static final String DESTINATION_PREFIX = "destination"; - protected FlowGraph flowGraph; - protected FlowSpec flowSpec; - protected Config flowConfig; + private List<DataNode> destNodes; + + FlowGraph flowGraph; - protected DataNode srcNode; - protected List<DataNode> destNodes; + DataNode srcNode; - protected DatasetDescriptor srcDatasetDescriptor; - protected DatasetDescriptor destDatasetDescriptor; + DatasetDescriptor srcDatasetDescriptor; + DatasetDescriptor destDatasetDescriptor; //Maintain path of FlowEdges as parent-child map - protected Map<FlowEdgeContext, FlowEdgeContext> pathMap; + Map<FlowEdgeContext, FlowEdgeContext> pathMap; //Flow Execution Id protected Long flowExecutionId; + protected FlowSpec flowSpec; + protected Config flowConfig; - public AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) + AbstractPathFinder(FlowGraph flowGraph, FlowSpec flowSpec) throws ReflectiveOperationException { this.flowGraph = flowGraph; this.flowSpec = flowSpec; @@ -94,12 +101,48 @@ public abstract class AbstractPathFinder implements PathFinder { } this.destNodes.add(destNode); } + + //Should apply retention? + boolean shouldApplyRetention = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_APPLY_RETENTION, true); + //Should apply retention on input dataset? + boolean shouldApplyRetentionOnInput = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_APPLY_INPUT_RETENTION, false); + + if ((shouldApplyRetentionOnInput) && (!shouldApplyRetention)) { + //Invalid retention config + throw new RuntimeException("Invalid retention configuration - shouldApplyRetentionOnInput = " + shouldApplyRetentionOnInput + + ", and shouldApplyRetention = " + shouldApplyRetention); + } + //Get src/dest dataset descriptors from the flow config Config srcDatasetDescriptorConfig = flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX); Config destDatasetDescriptorConfig = flowConfig.getConfig(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX); + //Add retention config for source and destination dataset descriptors. + if (shouldApplyRetentionOnInput) { + // We should run retention on source dataset. To ensure a retention is run, set + // isRetentionApplied=false for source dataset. + srcDatasetDescriptorConfig = srcDatasetDescriptorConfig + .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef(false)); + } else { + // Don't apply retention on source dataset. + // + // If ConfigurationKeys.FLOW_APPLY_RETENTION is true, isRetentionApplied is set to true for the source dataset. + // The PathFinder will therefore treat the source dataset as one on which retention has already been + // applied, preventing retention from running on the source dataset. + // + // On the other hand, if ConfigurationKeys.FLOW_APPLY_RETENTION is false + // we do not apply retention - neither on the source dataset nor anywhere along the path to the destination. + srcDatasetDescriptorConfig = srcDatasetDescriptorConfig + .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef(shouldApplyRetention)); + } + destDatasetDescriptorConfig = destDatasetDescriptorConfig.withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, ConfigValueFactory.fromAnyRef(shouldApplyRetention)); + + //Add the retention configs to the FlowConfig + flowConfig = flowConfig.withValue(ConfigurationKeys.FLOW_APPLY_RETENTION, ConfigValueFactory.fromAnyRef(shouldApplyRetention)); + flowConfig = flowConfig.withValue(ConfigurationKeys.FLOW_APPLY_INPUT_RETENTION, ConfigValueFactory.fromAnyRef(shouldApplyRetentionOnInput)); + Class srcdatasetDescriptorClass = Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils @@ -108,25 +151,23 @@ public abstract class AbstractPathFinder implements PathFinder { Class.forName(destDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY)); this.destDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils .invokeLongestConstructor(destDatasetDescriptorClass, destDatasetDescriptorConfig); + } - protected boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor, + boolean isPathFound(DataNode currentNode, DataNode destNode, DatasetDescriptor currentDatasetDescriptor, DatasetDescriptor destDatasetDescriptor) { - if ((currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor))) { - return true; - } - return false; + return (currentNode.equals(destNode)) && (currentDatasetDescriptor.equals(destDatasetDescriptor)); } /** * A helper method that sorts the {@link FlowEdge}s incident on srcNode based on whether the FlowEdge has an * output {@link DatasetDescriptor} that is compatible with the targetDatasetDescriptor. - * @param dataNode + * @param dataNode the {@link DataNode} to be expanded for determining candidate edges. * @param currentDatasetDescriptor Output {@link DatasetDescriptor} of the current edge. * @param destDatasetDescriptor Target {@link DatasetDescriptor}. * @return prioritized list of {@link FlowEdge}s to be added to the edge queue for expansion. */ - protected List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor, + List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor currentDatasetDescriptor, DatasetDescriptor destDatasetDescriptor) { List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>(); for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) { @@ -146,25 +187,18 @@ public abstract class AbstractPathFinder implements PathFinder { for (Pair<DatasetDescriptor, DatasetDescriptor> datasetDescriptorPair : datasetDescriptorPairs) { DatasetDescriptor inputDatasetDescriptor = datasetDescriptorPair.getLeft(); DatasetDescriptor outputDatasetDescriptor = datasetDescriptorPair.getRight(); + if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) { - FlowEdgeContext flowEdgeContext; - if (outputDatasetDescriptor.contains(currentDatasetDescriptor)) { - //If datasets described by the currentDatasetDescriptor is a subset of the datasets described - // by the outputDatasetDescriptor (i.e. currentDatasetDescriptor is more "specific" than outputDatasetDescriptor, e.g. - // as in the case of a "distcp" edge), we propagate the more "specific" dataset descriptor forward. - flowEdgeContext = - new FlowEdgeContext(flowEdge, currentDatasetDescriptor, currentDatasetDescriptor, mergedConfig, - specExecutor); - } else { - //outputDatasetDescriptor is more specific (e.g. if it is a dataset transformation edge) - flowEdgeContext = - new FlowEdgeContext(flowEdge, currentDatasetDescriptor, outputDatasetDescriptor, mergedConfig, - specExecutor); - } + DatasetDescriptor edgeOutputDescriptor = makeOutputDescriptorSpecific(currentDatasetDescriptor, outputDatasetDescriptor); + FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge, currentDatasetDescriptor, edgeOutputDescriptor, mergedConfig, + specExecutor); + if (destDatasetDescriptor.getFormatConfig().contains(outputDatasetDescriptor.getFormatConfig())) { - //Add to the front of the edge list if platform-independent properties of the output descriptor is compatible - // with those of destination dataset descriptor. - // In other words, we prioritize edges that perform data transformations as close to the source as possible. + /* + Add to the front of the edge list if platform-independent properties of the output descriptor is compatible + with those of destination dataset descriptor. + In other words, we prioritize edges that perform data transformations as close to the source as possible. + */ prioritizedEdgeList.add(0, flowEdgeContext); } else { prioritizedEdgeList.add(flowEdgeContext); @@ -188,6 +222,65 @@ public abstract class AbstractPathFinder implements PathFinder { } /** + * A helper method to make the output {@link DatasetDescriptor} of a {@link FlowEdge} "specific". More precisely, + * we replace any "placeholder" configurations in the output {@link DatasetDescriptor} with specific configuration + * values obtained from the input {@link DatasetDescriptor}. A placeholder configuration is one which is not + * defined or is set to {@link DatasetDescriptorConfigKeys#DATASET_DESCRIPTOR_CONFIG_ANY}. + * + * Example: Consider a {@link FlowEdge} that applies retention on an input dataset. Further assume that this edge + * is applicable to datasets of all formats. The input and output descriptors of this edge may be described using the following + * configs: + * inputDescriptor = Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor", + * "codec":"any","encrypt":{"algorithm":"any","keystore_encoding":"any","keystore_type":"any"},"format":"any", + * "isRetentionApplied":false,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"})) + * + * outputDescriptor = Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor", + * "codec":"any","encrypt":{"algorithm":"any","keystore_encoding":"any","keystore_type":"any"},"format":"any", + * "isRetentionApplied":true,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"})) + * + * Let the intermediate dataset descriptor "arriving" at this edge be described using the following config: + * currentDescriptor = Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor", + * "codec":"gzip","encrypt":{"algorithm":"aes_rotating","keystore_encoding":"base64","keystore_type":"json"},"format":"json", + * "isRetentionApplied":false,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"})). + * + * This method replaces the placeholder configs in outputDescriptor with specific values from currentDescriptor to return: + * returnedDescriptor = Config(SimpleConfigObject({"class":"org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor", + * "codec":"gzip","encrypt":{"algorithm":"aes_rotating","keystore_encoding":"base64","keystore_type":"json"},"format":"json", + * "isRetentionApplied":<b>true</b>,"path":"/data/encrypted/testTeam/testDataset","platform":"hdfs"})). + * + * @param currentDescriptor intermediate {@link DatasetDescriptor} obtained during path finding. + * @param outputDescriptor output {@link DatasetDescriptor} of a {@link FlowEdge}. + * @return {@link DatasetDescriptor} with placeholder configs in outputDescriptor substituted with specific values + * from the currentDescriptor. + */ + + private DatasetDescriptor makeOutputDescriptorSpecific(DatasetDescriptor currentDescriptor, DatasetDescriptor outputDescriptor) + throws ReflectiveOperationException { + Config config = outputDescriptor.getRawConfig(); + + for (Iterator<Map.Entry<String, ConfigValue>> iterator = currentDescriptor.getRawConfig().entrySet().iterator(); + iterator.hasNext(); ) { + Map.Entry<String, ConfigValue> entry = iterator.next(); + String entryValue = entry.getValue().unwrapped().toString(); + if (!isPlaceHolder(entryValue)) { + String entryValueInOutputDescriptor = ConfigUtils.getString(config, entry.getKey(), StringUtils.EMPTY); + if (isPlaceHolder(entryValueInOutputDescriptor)) { + config = config.withValue(entry.getKey(), ConfigValueFactory.fromAnyRef(entryValue)); + } + } + } + return GobblinConstructorUtils.invokeLongestConstructor(outputDescriptor.getClass(), config); + } + + /** + * A placeholder configuration is one which is not defined or is set to {@link DatasetDescriptorConfigKeys#DATASET_DESCRIPTOR_CONFIG_ANY}. + * @param value to be examined for determining if it is a placeholder. + * @return true if the value is null or empty or equals {@link DatasetDescriptorConfigKeys#DATASET_DESCRIPTOR_CONFIG_ANY}. + */ + private boolean isPlaceHolder(String value) { + return Strings.isNullOrEmpty(value) || value.equals(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY); + } + /** * Build the merged config for each {@link FlowEdge}, which is a combination of (in the precedence described below): * <ul> * <p> the user provided flow config </p> http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index 5e9f86c..7517c17 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -35,6 +35,7 @@ import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.util.ConfigUtils; @@ -71,11 +72,14 @@ public class JobExecutionPlan { String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, ""); String flowGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_GROUP_KEY, ""); - String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); String flowFailureOption = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_FAILURE_OPTION, DagManager.DEFAULT_FLOW_FAILURE_OPTION); - //Modify the job name to include the flow group, flow name and a randomly generated integer to make the job name unique. - jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, random.nextInt(Integer.MAX_VALUE)); + String jobName = ConfigUtils.getString(jobConfig, ConfigurationKeys.JOB_NAME_KEY, ""); + String source = ConfigUtils.getString(jobConfig, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ""); + String destination = ConfigUtils.getString(jobConfig, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ""); + + //Modify the job name to include the flow group, flow name and source and destination node ids for the job. + jobName = Joiner.on(JOB_NAME_COMPONENT_SEPARATION_CHAR).join(flowGroup, flowName, jobName, source, destination); JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowGroup, jobName, flowSpec)).withConfig(jobConfig) .withDescription(flowSpec.getDescription()).withVersion(flowSpec.getVersion()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java index 01074f8..bbd863b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/FlowGraphPathTest.java @@ -127,9 +127,5 @@ public class FlowGraphPathTest { Assert.assertEquals(dagNew.getStartNodes().get(1).getValue().getJobSpec().getConfig().getString(ConfigurationKeys.JOB_NAME_KEY), "job1"); Assert.assertFalse(dagNew.getStartNodes().get(1).getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES)); Assert.assertFalse(dagNew.getStartNodes().get(1).getValue().getJobSpec().getConfig().hasPath(ConfigurationKeys.JOB_DEPENDENCIES)); - - } - - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java index c04b2b9..54792f9 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java @@ -24,8 +24,11 @@ import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -49,6 +52,7 @@ import org.testng.annotations.Test; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -69,6 +73,7 @@ import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor; import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph; import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.flowgraph.DataNode; import org.apache.gobblin.service.modules.flowgraph.FlowEdge; import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory; @@ -141,7 +146,8 @@ public class MultiHopFlowCompilerTest { this.specCompiler = new MultiHopFlowCompiler(config, this.flowGraph); } - private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination) throws IOException, URISyntaxException { + private FlowSpec createFlowSpec(String flowConfigResource, String source, String destination, boolean applyRetention, boolean applyRetentionOnInput) + throws IOException, URISyntaxException { //Create a flow spec Properties flowProperties = new Properties(); flowProperties.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "* * * * *"); @@ -149,6 +155,8 @@ public class MultiHopFlowCompilerTest { flowProperties.put(ConfigurationKeys.FLOW_NAME_KEY, "testFlowName"); flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, source); flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, destination); + flowProperties.put(ConfigurationKeys.FLOW_APPLY_RETENTION, Boolean.toString(applyRetention)); + flowProperties.put(ConfigurationKeys.FLOW_APPLY_INPUT_RETENTION, Boolean.toString(applyRetentionOnInput)); Config flowConfig = ConfigUtils.propertiesToConfig(flowProperties); //Get the input/output dataset config from a file @@ -172,14 +180,14 @@ public class MultiHopFlowCompilerTest { @Test public void testCompileFlow() throws URISyntaxException, IOException { - FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1"); + FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", false, false); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); Assert.assertEquals(jobDag.getNodes().size(), 4); Assert.assertEquals(jobDag.getStartNodes().size(), 1); Assert.assertEquals(jobDag.getEndNodes().size(), 1); //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-1" - Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); + DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); JobExecutionPlan jobSpecWithExecutor = startNode.getValue(); JobSpec jobSpec = jobSpecWithExecutor.getJobSpec(); @@ -188,9 +196,9 @@ public class MultiHopFlowCompilerTest { String flowGroup = "testFlowGroup"; String flowName = "testFlowName"; String expectedJobName1 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-1"); String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName1.startsWith(expectedJobName1)); + Assert.assertEquals(jobName1, expectedJobName1); String from = jobConfig.getString("from"); String to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/out/testTeam/testDataset"); @@ -212,15 +220,15 @@ public class MultiHopFlowCompilerTest { Assert.assertEquals(specExecutor.getUri().toString(), "fs:///"); Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"); - //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt" + //Get the 2nd hop - "HDFS-1 to HDFS-1 : convert avro to json and encrypt". Ensure config has correct substitutions. Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); - Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); + DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); jobSpecWithExecutor = secondHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); String expectedJobName2 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "convert-to-json-and-encrypt"); + join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-1", "HDFS-1"); String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName2.startsWith(expectedJobName2)); + Assert.assertEquals(jobName2, expectedJobName2); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName1); from = jobConfig.getString("from"); to = jobConfig.getString("to"); @@ -234,13 +242,13 @@ public class MultiHopFlowCompilerTest { //Get the 3rd hop - "Distcp HDFS-1 to HDFS-3" Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); + DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); jobSpecWithExecutor = thirdHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); String expectedJobName3 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + join(flowGroup, flowName, "Distcp", "HDFS-1", "HDFS-3"); String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName3.startsWith(expectedJobName3)); + Assert.assertEquals(jobName3, expectedJobName3); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName2); from = jobConfig.getString("from"); to = jobConfig.getString("to"); @@ -256,15 +264,15 @@ public class MultiHopFlowCompilerTest { Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban01.gobblin.net:8443"); Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); - //Get the 4th hop - "Distcp from HDFS3 to ADLS-1" + //Get the 4th hop - "Distcp from HDFS-3 to ADLS-1" Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); + DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); jobSpecWithExecutor = fourthHopNode.getValue(); jobConfig = jobSpecWithExecutor.getJobSpec().getConfig(); String expectedJobName4 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp-HDFS-ADL"); + join(flowGroup, flowName, "DistcpToADL", "HDFS-3", "ADLS-1"); String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName4.startsWith(expectedJobName4)); + Assert.assertEquals(jobName4, expectedJobName4); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName3); from = jobConfig.getString("from"); to = jobConfig.getString("to"); @@ -287,12 +295,62 @@ public class MultiHopFlowCompilerTest { Assert.assertEquals(jobDag.getEndNodes().get(0), fourthHopNode); } + @Test (dependsOnMethods = "testCompileFlow") + public void testCompileFlowWithRetention() throws URISyntaxException, IOException { + FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", true, + true); + Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); + Assert.assertEquals(jobDag.getNodes().size(), 9); + Assert.assertEquals(jobDag.getStartNodes().size(), 2); + Assert.assertEquals(jobDag.getEndNodes().size(), 5); + + String flowGroup = "testFlowGroup"; + String flowName = "testFlowName"; + + List<DagNode<JobExecutionPlan>> currentHopNodes = jobDag.getStartNodes(); + + List<String> expectedJobNames = Lists.newArrayList("SnapshotRetention", "Distcp", "SnapshotRetention", "ConvertToJsonAndEncrypt", "SnapshotRetention" , + "Distcp", "SnapshotRetention", "DistcpToADL", "SnapshotRetention"); + List<String> sourceNodes = Lists.newArrayList("LocalFS-1", "LocalFS-1", "HDFS-1", "HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1"); + List<String> destinationNodes = Lists.newArrayList("LocalFS-1", "HDFS-1", "HDFS-1", "HDFS-1", "HDFS-1", "HDFS-3", "HDFS-3", "ADLS-1", "ADLS-1"); + + List<DagNode<JobExecutionPlan>> nextHopNodes = new ArrayList<>(); + for (int i = 0; i < 9; i += 2) { + if (i < 8) { + Assert.assertEquals(currentHopNodes.size(), 2); + } else { + Assert.assertEquals(currentHopNodes.size(), 1); + } + Set<String> jobNames = new HashSet<>(); + jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, expectedJobNames.get(i), sourceNodes.get(i), destinationNodes.get(i))); + if (i < 8) { + jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join(flowGroup, flowName, expectedJobNames.get(i + 1), sourceNodes.get(i + 1), destinationNodes.get(i + 1))); + } + + for (DagNode<JobExecutionPlan> dagNode : currentHopNodes) { + Config jobConfig = dagNode.getValue().getJobSpec().getConfig(); + String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobNames.contains(jobName)); + log.warn(jobName); + nextHopNodes.addAll(jobDag.getChildren(dagNode)); + } + + currentHopNodes = nextHopNodes; + nextHopNodes = new ArrayList<>(); + } + Assert.assertEquals(nextHopNodes.size(), 0); + + } + + @Test (dependsOnMethods = "testCompileFlowWithRetention") public void testCompileFlowAfterFirstEdgeDeletion() throws URISyntaxException, IOException { //Delete the self edge on HDFS-1 that performs convert-to-json-and-encrypt. this.flowGraph.deleteFlowEdge("HDFS-1:HDFS-1:hdfsConvertToJsonAndEncrypt"); - FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1"); + FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", false, false); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); Assert.assertEquals(jobDag.getNodes().size(), 4); @@ -300,7 +358,7 @@ public class MultiHopFlowCompilerTest { Assert.assertEquals(jobDag.getEndNodes().size(), 1); //Get the 1st hop - Distcp from "LocalFS-1" to "HDFS-2" - Dag.DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); + DagNode<JobExecutionPlan> startNode = jobDag.getStartNodes().get(0); JobExecutionPlan jobExecutionPlan = startNode.getValue(); JobSpec jobSpec = jobExecutionPlan.getJobSpec(); @@ -309,9 +367,9 @@ public class MultiHopFlowCompilerTest { String flowGroup = "testFlowGroup"; String flowName = "testFlowName"; String expectedJobName1 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + join(flowGroup, flowName, "Distcp", "LocalFS-1", "HDFS-2"); String jobName1 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName1.startsWith(expectedJobName1)); + Assert.assertEquals(jobName1, expectedJobName1); String from = jobConfig.getString("from"); String to = jobConfig.getString("to"); Assert.assertEquals(from, "/data/out/testTeam/testDataset"); @@ -335,13 +393,13 @@ public class MultiHopFlowCompilerTest { //Get the 2nd hop - "HDFS-2 to HDFS-2 : convert avro to json and encrypt" Assert.assertEquals(jobDag.getChildren(startNode).size(), 1); - Dag.DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); + DagNode<JobExecutionPlan> secondHopNode = jobDag.getChildren(startNode).get(0); jobExecutionPlan = secondHopNode.getValue(); jobConfig = jobExecutionPlan.getJobSpec().getConfig(); String expectedJobName2 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "convert-to-json-and-encrypt"); + join(flowGroup, flowName, "ConvertToJsonAndEncrypt", "HDFS-2", "HDFS-2"); String jobName2 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName2.startsWith(expectedJobName2)); + Assert.assertEquals(jobName2, expectedJobName2); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName1); from = jobConfig.getString("from"); to = jobConfig.getString("to"); @@ -355,13 +413,13 @@ public class MultiHopFlowCompilerTest { //Get the 3rd hop - "Distcp HDFS-2 to HDFS-4" Assert.assertEquals(jobDag.getChildren(secondHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); + DagNode<JobExecutionPlan> thirdHopNode = jobDag.getChildren(secondHopNode).get(0); jobExecutionPlan = thirdHopNode.getValue(); jobConfig = jobExecutionPlan.getJobSpec().getConfig(); String expectedJobName3 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp-HDFS-HDFS"); + join(flowGroup, flowName, "Distcp", "HDFS-2", "HDFS-4"); String jobName3 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName3.startsWith(expectedJobName3)); + Assert.assertEquals(jobName3, expectedJobName3); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName2); from = jobConfig.getString("from"); to = jobConfig.getString("to"); @@ -377,16 +435,16 @@ public class MultiHopFlowCompilerTest { Assert.assertEquals(specExecutor.getUri().toString(), "https://azkaban02.gobblin.net:8443"); Assert.assertEquals(specExecutor.getClass().getCanonicalName(), "org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest.TestAzkabanSpecExecutor"); - //Get the 4th hop - "Distcp from HDFS4 to ADLS-1" + //Get the 4th hop - "Distcp from HDFS-4 to ADLS-1" Assert.assertEquals(jobDag.getChildren(thirdHopNode).size(), 1); - Dag.DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); + DagNode<JobExecutionPlan> fourthHopNode = jobDag.getChildren(thirdHopNode).get(0); jobExecutionPlan = fourthHopNode.getValue(); jobConfig = jobExecutionPlan.getJobSpec().getConfig(); String expectedJobName4 = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). - join(flowGroup, flowName, "Distcp-HDFS-ADL"); + join(flowGroup, flowName, "DistcpToADL", "HDFS-4", "ADLS-1"); String jobName4 = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); - Assert.assertTrue(jobName4.startsWith(expectedJobName4)); + Assert.assertEquals(jobName4, expectedJobName4); Assert.assertEquals(jobConfig.getString(ConfigurationKeys.JOB_DEPENDENCIES), jobName3); from = jobConfig.getString("from"); to = jobConfig.getString("to"); @@ -414,7 +472,7 @@ public class MultiHopFlowCompilerTest { //Delete the self edge on HDFS-2 that performs convert-to-json-and-encrypt. this.flowGraph.deleteFlowEdge("HDFS-2:HDFS-2:hdfsConvertToJsonAndEncrypt"); - FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1"); + FlowSpec spec = createFlowSpec("flow/flow1.conf", "LocalFS-1", "ADLS-1", false, false); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); //Ensure no path to destination. @@ -423,50 +481,57 @@ public class MultiHopFlowCompilerTest { @Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion") public void testCompileFlowSingleHop() throws IOException, URISyntaxException { - FlowSpec spec = createFlowSpec("flow/flow2.conf", "HDFS-1", "HDFS-3"); + FlowSpec spec = createFlowSpec("flow/flow2.conf", "HDFS-1", "HDFS-3", false, false); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); Assert.assertEquals(jobDag.getNodes().size(), 1); Assert.assertEquals(jobDag.getStartNodes().size(), 1); Assert.assertEquals(jobDag.getEndNodes().size(), 1); Assert.assertEquals(jobDag.getStartNodes().get(0), jobDag.getEndNodes().get(0)); - //Ensure hop is from HDFS-1 to HDFS-3. - Dag.DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0); - JobExecutionPlan jobExecutionPlan = dagNode.getValue(); - Config jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn01.grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn03.grid.linkedin.com:8888/"); + //Ensure hop is from HDFS-1 to HDFS-3 i.e. jobName == "testFlowGroup_testFlowName_Distcp_HDFS-1_HDFS-3". + DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0); + Config jobConfig = dagNode.getValue().getJobSpec().getConfig(); + String expectedJobName = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3"); + String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertEquals(jobName, expectedJobName); } @Test (dependsOnMethods = "testCompileFlowSingleHop") public void testMulticastPath() throws IOException, URISyntaxException { - FlowSpec spec = createFlowSpec("flow/flow2.conf", "LocalFS-1", "HDFS-3,HDFS-4"); + FlowSpec spec = createFlowSpec("flow/flow2.conf", "LocalFS-1", "HDFS-3,HDFS-4", false, false); Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec); Assert.assertEquals(jobDag.getNodes().size(), 4); Assert.assertEquals(jobDag.getEndNodes().size(), 2); Assert.assertEquals(jobDag.getStartNodes().size(), 2); - int i = 1; //First hop must be from LocalFS to HDFS-1 and HDFS-2 - for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) { - JobExecutionPlan jobExecutionPlan = dagNode.getValue(); - Config jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "file:///"); - Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn0" + i++ + ".grid.linkedin.com:8888/"); + Set<String> jobNames = new HashSet<>(); + jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", "HDFS-1")); + jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join("testFlowGroup", "testFlowName", "Distcp", "LocalFS-1", "HDFS-2")); + + for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) { + Config jobConfig = dagNode.getValue().getJobSpec().getConfig(); + String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobNames.contains(jobName)); } - i = 1; //Second hop must be from HDFS-1/HDFS-2 to HDFS-3/HDFS-4 respectively. - for (Dag.DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) { - List<Dag.DagNode<JobExecutionPlan>> nextNodes = jobDag.getChildren(dagNode); + jobNames = new HashSet<>(); + jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3")); + jobNames.add(Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR). + join("testFlowGroup", "testFlowName", "Distcp", "HDFS-2", "HDFS-4")); + for (DagNode<JobExecutionPlan> dagNode : jobDag.getStartNodes()) { + List<DagNode<JobExecutionPlan>> nextNodes = jobDag.getChildren(dagNode); Assert.assertEquals(nextNodes.size(), 1); - JobExecutionPlan jobExecutionPlan = nextNodes.get(0).getValue(); - Config jobConfig = jobExecutionPlan.getJobSpec().getConfig(); - Assert.assertEquals(jobConfig.getString("source.filebased.fs.uri"), "hdfs://hadoopnn0" + i + ".grid.linkedin.com:8888/"); - Assert.assertEquals(jobConfig.getString("target.filebased.fs.uri"), "hdfs://hadoopnn0" + (i + 2) + ".grid.linkedin.com:8888/"); - i += 1; + Config jobConfig = nextNodes.get(0).getValue().getJobSpec().getConfig(); + String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY); + Assert.assertTrue(jobNames.contains(jobName)); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java index 3a7ab10..dc21413 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/DagTest.java @@ -160,6 +160,10 @@ public class DagTest { forkNodes.add(dagNode3); Dag<String> dagNew = dag1.concatenate(dag2, forkNodes); + Assert.assertEquals(dagNew.getChildren(dagNode2).size(), 1); + Assert.assertEquals(dagNew.getChildren(dagNode2).get(0), dagNode4); + Assert.assertEquals(dagNew.getParents(dagNode4).size(), 1); + Assert.assertEquals(dagNew.getParents(dagNode4).get(0), dagNode2); Assert.assertEquals(dagNew.getEndNodes().size(), 2); Assert.assertEquals(dagNew.getEndNodes().get(0).getValue(), "val4"); Assert.assertEquals(dagNew.getEndNodes().get(1).getValue(), "val3"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties new file mode 100644 index 0000000..52079d1 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-1.properties @@ -0,0 +1,9 @@ +flow.edge.source=ADLS-1 +flow.edge.destination=ADLS-1 +flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties new file mode 100644 index 0000000..7b1a160 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/adls-1-to-adls-1-retention-2.properties @@ -0,0 +1,9 @@ +flow.edge.source=ADLS-1 +flow.edge.destination=ADLS-1 +flow.edge.id=ADLS-1:ADLS-1:hdfsRemoteRetention +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties new file mode 100644 index 0000000..926c51e --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-hdfs-1-retention.properties @@ -0,0 +1,9 @@ +flow.edge.source=HDFS-1 +flow.edge.destination=HDFS-1 +flow.edge.id=HDFS-1:HDFS-1:hdfsRetention +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban01.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties new file mode 100644 index 0000000..26454e7 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-2-hdfs-2-retention.properties @@ -0,0 +1,9 @@ +flow.edge.source=HDFS-2 +flow.edge.destination=HDFS-2 +flow.edge.id=HDFS-2:HDFS-2:hdfsRetention +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban02.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties new file mode 100644 index 0000000..f390546 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-3-to-hdfs-3-retention.properties @@ -0,0 +1,9 @@ +flow.edge.source=HDFS-3 +flow.edge.destination=HDFS-3 +flow.edge.id=HDFS-3:HDFS-3:hdfsRetention +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban03.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties new file mode 100644 index 0000000..6afb3d8 --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-4-to-hdfs-4-retention.properties @@ -0,0 +1,9 @@ +flow.edge.source=HDFS-4 +flow.edge.destination=HDFS-4 +flow.edge.id=HDFS-4:HDFS-4:hdfsRetention +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest$TestAzkabanSpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=https://azkaban04.gobblin.net:8443 +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.azkaban.AzkabanJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=MAPREDUCE +flow.edge.specExecutors.0.specExecInstance.job.type=hadoopJava \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties new file mode 100644 index 0000000..76c77fd --- /dev/null +++ b/gobblin-service/src/test/resources/flowgraph/flowedges/local-to-local-retention.properties @@ -0,0 +1,9 @@ +flow.edge.source=LocalFS-1 +flow.edge.destination=LocalFS-1 +flow.edge.id=LocalFS-1:LocalFS-1:localRetention +flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsSnapshotRetention +flow.edge.specExecutors.0.specExecInstance.class=org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor +flow.edge.specExecutors.0.specExecInstance.uri=fs:/// +flow.edge.specExecutors.0.specExecInstance.job.launcher.class=org.apache.gobblin.runtime.local.LocalJobLauncher +flow.edge.specExecutors.0.specExecInstance.job.launcher.type=LOCAL +flow.edge.specExecutors.0.specExecInstance.job.type=java \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf index 0a53e5b..2ae6fb6 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf @@ -2,6 +2,7 @@ gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.mo gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name} gobblin.flow.edge.input.dataset.descriptor.0.format=avro +gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention} ############################################################# # Define input dataset to be uncompressed and unencrypted ############################################################# http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf new file mode 100644 index 0000000..8560ebf --- /dev/null +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf @@ -0,0 +1,35 @@ +gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs +gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name} + +gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class} +gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform} +gobblin.flow.edge.output.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name} +gobblin.flow.edge.output.dataset.descriptor.0.isRetentionApplied=true + +gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs +gobblin.flow.edge.input.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name} + +gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class} +gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform} +gobblin.flow.edge.output.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name} +gobblin.flow.edge.output.dataset.descriptor.1.isRetentionApplied=true + +gobblin.flow.edge.input.dataset.descriptor.2.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.edge.input.dataset.descriptor.2.platform=adls +gobblin.flow.edge.input.dataset.descriptor.2.path=/data/out/${team.name}/${dataset.name} + +gobblin.flow.edge.output.dataset.descriptor.2.class=${gobblin.flow.edge.input.dataset.descriptor.2.class} +gobblin.flow.edge.output.dataset.descriptor.2.platform=${gobblin.flow.edge.input.dataset.descriptor.2.platform} +gobblin.flow.edge.output.dataset.descriptor.2.path=/data/out/${team.name}/${dataset.name} +gobblin.flow.edge.output.dataset.descriptor.2.isRetentionApplied=true + +gobblin.flow.edge.input.dataset.descriptor.3.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor +gobblin.flow.edge.input.dataset.descriptor.3.platform=adls +gobblin.flow.edge.input.dataset.descriptor.3.path=/data/encrypted/${team.name}/${dataset.name} + +gobblin.flow.edge.output.dataset.descriptor.3.class=${gobblin.flow.edge.input.dataset.descriptor.3.class} +gobblin.flow.edge.output.dataset.descriptor.3.platform=${gobblin.flow.edge.input.dataset.descriptor.3.platform} +gobblin.flow.edge.output.dataset.descriptor.3.path=/data/encrypted/${team.name}/${dataset.name} +gobblin.flow.edge.output.dataset.descriptor.3.isRetentionApplied=true http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job new file mode 100644 index 0000000..46c43c2 --- /dev/null +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/jobs/hdfs-snapshot-retention.job @@ -0,0 +1,15 @@ +gobblin.template.uri="resource:///template_catalog/multihop/jobTemplates/hdfs-retention.template" +#Should Retention job run in parallel with the next job? +job.forkOnConcat=true +#Dataset version finder class +gobblin.retention.version.finder.class="org.apache.gobblin.data.management.version.finder.DateTimeDatasetVersionFinder" +#Retention Policy +gobblin.retention.selection.policy.class="org.apache.gobblin.data.management.policy.CombineSelectionPolicy" +gobblin.retention.selection.combine.policy.classes="[org.apache.gobblin.data.management.retention.policy.TimeBasedRetentionPolicy,org.apache.gobblin.data.management.retention.policy.NewestKRetentionPolicy]" +#Num versions to keep +gobblin.retention.selection.newestK.versionsNotSelected=7 +#Maximum lookback time +gobblin.retention.selection.timeBased.lookbackTime=7d +gobblin.retention.selection.combine.operation="INTERSECT" +gobblin.retention.version.globPattern="snapshot/*" +gobblin.retention.version.datetime.pattern="yyyy-MM-dd_HH_mm_ss" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf index 2cbf420..2fc7522 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToAdl/flow.conf @@ -6,6 +6,7 @@ gobblin.flow.edge.input.dataset.descriptor.0.codec=gzip gobblin.flow.edge.input.dataset.descriptor.0.encrypt.algorithm=aes_rotating gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_type=json gobblin.flow.edge.input.dataset.descriptor.0.encrypt.keystore_encoding=base64 +gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention} gobblin.flow.edge.output.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor gobblin.flow.edge.output.dataset.descriptor.0.platform=adls http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf index abac6b5..4563780 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf @@ -1,6 +1,7 @@ gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name} +gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention} gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class} gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform} @@ -9,6 +10,7 @@ gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dat gobblin.flow.edge.input.dataset.descriptor.1.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor gobblin.flow.edge.input.dataset.descriptor.1.platform=hdfs gobblin.flow.edge.input.dataset.descriptor.1.path=/data/encrypted/${team.name}/${dataset.name} +gobblin.flow.edge.input.dataset.descriptor.1.isRetentionApplied=${flow.applyRetention} gobblin.flow.edge.output.dataset.descriptor.1.class=${gobblin.flow.edge.input.dataset.descriptor.1.class} gobblin.flow.edge.output.dataset.descriptor.1.platform=${gobblin.flow.edge.input.dataset.descriptor.1.platform} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/49974214/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf index d0765e1..f4deb46 100644 --- a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf +++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/localToHdfs/flow.conf @@ -2,6 +2,7 @@ gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.mo gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${team.name}/${dataset.name} gobblin.flow.edge.input.dataset.descriptor.0.format=avro +gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention} gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class} gobblin.flow.edge.output.dataset.descriptor.0.platform=${gobblin.flow.edge.input.dataset.descriptor.0.platform}
