This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new baf2abe [GOBBLIN-894] Add option to combine datasets into a single
flow
baf2abe is described below
commit baf2abe35cedeb7dec01528cbcca659c4fd73fa9
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Oct 10 20:52:37 2019 -0700
[GOBBLIN-894] Add option to combine datasets into a single flow
Closes #2749 from jack-moseley/combine-flows
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../modules/dataset/BaseDatasetDescriptor.java | 6 ++--
.../modules/dataset/FSDatasetDescriptor.java | 33 ++++++++++++++++-
.../modules/dataset/SqlDatasetDescriptor.java | 7 ++--
.../service/modules/flow/MultiHopFlowCompiler.java | 42 +++++++++++++++++-----
.../flowgraph/DatasetDescriptorConfigKeys.java | 1 +
.../template_catalog/FSFlowTemplateCatalog.java | 10 +++---
.../modules/dataset/FSDatasetDescriptorTest.java | 8 +++++
.../modules/flow/MultiHopFlowCompilerTest.java | 31 ++++++++++++++--
gobblin-service/src/test/resources/flow/flow4.conf | 14 ++++++++
.../hdfsSnapshotRetention/flow.conf | 11 +++++-
.../flowEdgeTemplates/hdfsToHdfs/flow.conf | 11 +++++-
.../multihop/jobTemplates/distcp.template | 2 +-
.../multihop/jobTemplates/hdfs-retention.template | 2 +-
14 files changed, 151 insertions(+), 28 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index dc8b8cb..c6572a9 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -898,6 +898,7 @@ public class ConfigurationKeys {
public static final String DATASET_SUBPATHS_KEY =
"gobblin.flow.dataset.subPaths";
public static final String DATASET_BASE_INPUT_PATH_KEY =
"gobblin.flow.dataset.baseInputPath";
public static final String DATASET_BASE_OUTPUT_PATH_KEY =
"gobblin.flow.dataset.baseOutputPath";
+ public static final String DATASET_COMBINE_KEY =
"gobblin.flow.dataset.combine";
/***
* Configuration properties related to TopologySpec Store
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
index c9ad8b5..cd98c8b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
@@ -62,7 +62,7 @@ public abstract class BaseDatasetDescriptor implements
DatasetDescriptor {
/**
* {@inheritDoc}
*/
- protected abstract boolean isPathContaining(String otherPath);
+ protected abstract boolean isPathContaining(DatasetDescriptor other);
/**
* @return true if this {@link DatasetDescriptor} contains the other {@link
DatasetDescriptor} i.e. the
@@ -75,7 +75,7 @@ public abstract class BaseDatasetDescriptor implements
DatasetDescriptor {
if (this == other) {
return true;
}
-
+
if (other == null || !getClass().equals(other.getClass())) {
return false;
}
@@ -88,6 +88,6 @@ public abstract class BaseDatasetDescriptor implements
DatasetDescriptor {
return false;
}
- return isPathContaining(other.getPath()) &&
getFormatConfig().contains(other.getFormatConfig());
+ return isPathContaining(other) &&
getFormatConfig().contains(other.getFormatConfig());
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index 3b2611e..57f23b1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -18,10 +18,13 @@
package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.fs.Path;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -46,6 +49,8 @@ public class FSDatasetDescriptor extends
BaseDatasetDescriptor implements Datase
@Getter
private final String path;
@Getter
+ private final String subPaths;
+ @Getter
private final boolean isCompacted;
@Getter
private final boolean isCompactedAndDeduped;
@@ -65,6 +70,7 @@ public class FSDatasetDescriptor extends
BaseDatasetDescriptor implements Datase
this.path = PathUtils
.getPathWithoutSchemeAndAuthority(new
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
+ this.subPaths = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.SUBPATHS_KEY, null);
this.isCompacted = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
this.isCompactedAndDeduped = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
this.partitionConfig = new
FSDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config,
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
@@ -72,6 +78,31 @@ public class FSDatasetDescriptor extends
BaseDatasetDescriptor implements Datase
}
/**
+ * If other descriptor has subpaths, this method checks that each
concatenation of path + subpath is matched by this
+ * path. Otherwise, it just checks the path.
+ *
+ * @param other descriptor whose path/subpaths to check
+ * @return true if all subpaths are matched by this {@link
DatasetDescriptor}'s path, or if subpaths is null and
+ * the other's path matches this path.
+ */
+ @Override
+ protected boolean isPathContaining(DatasetDescriptor other) {
+ String otherPath = other.getPath();
+ String otherSubPaths = ((FSDatasetDescriptor) other).getSubPaths();
+ if (otherSubPaths != null) {
+ List<String> subPaths =
Splitter.on(",").splitToList(StringUtils.stripEnd(StringUtils.stripStart(otherSubPaths,
"{"), "}"));
+ for (String subPath : subPaths) {
+ if (!isPathContaining(new Path(otherPath, subPath).toString())) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return isPathContaining(otherPath);
+ }
+ }
+
+ /**
* A helper to determine if the path description of this {@link
DatasetDescriptor} is a superset of paths
* accepted by the other {@link DatasetDescriptor}. If the path description
of the other {@link DatasetDescriptor}
* is a glob pattern, we return false.
@@ -79,7 +110,7 @@ public class FSDatasetDescriptor extends
BaseDatasetDescriptor implements Datase
* @param otherPath a glob pattern that describes a set of paths.
* @return true if the glob pattern described by the otherPath matches the
path in this {@link DatasetDescriptor}.
*/
- protected boolean isPathContaining(String otherPath) {
+ private boolean isPathContaining(String otherPath) {
if (otherPath == null) {
return false;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index 2811992..a334ab6 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -89,7 +89,7 @@ public class SqlDatasetDescriptor extends
BaseDatasetDescriptor implements Datas
return Enums.getIfPresent(Platform.class,
getPlatform().toUpperCase()).isPresent();
}
/**
- * Check if the dbName and tableName specified in {@param otherPath} are
accepted by the set of dbName.tableName
+ * Check if the dbName and tableName specified in {@param other}'s path are
accepted by the set of dbName.tableName
* combinations defined by the current {@link SqlDatasetDescriptor}. For
example, let:
* this.path = "test_.*;test_table_.*". Then:
* isPathContaining("test_db1;test_table_1") = true
@@ -98,10 +98,11 @@ public class SqlDatasetDescriptor extends
BaseDatasetDescriptor implements Datas
* NOTE: otherPath cannot be a globPattern. So:
* isPathContaining("test_db.*;test_table_*") = false
*
- * @param otherPath which should be in the format of dbName.tableName
+ * @param other whose path should be in the format of dbName.tableName
*/
@Override
- protected boolean isPathContaining(String otherPath) {
+ protected boolean isPathContaining(DatasetDescriptor other) {
+ String otherPath = other.getPath();
if (otherPath == null) {
return false;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 67d1f19..dbc01ba 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -212,20 +213,34 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
private static List<FlowSpec> splitFlowSpec(FlowSpec flowSpec) {
long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
List<FlowSpec> flowSpecs = new ArrayList<>();
+ Config flowConfig = flowSpec.getConfig();
- if (flowSpec.getConfig().hasPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)) {
- List<String> datasetSubpaths =
ConfigUtils.getStringList(flowSpec.getConfig(),
ConfigurationKeys.DATASET_SUBPATHS_KEY);
- String baseInputPath = ConfigUtils.getString(flowSpec.getConfig(),
ConfigurationKeys.DATASET_BASE_INPUT_PATH_KEY, "/");
- String baseOutputPath = ConfigUtils.getString(flowSpec.getConfig(),
ConfigurationKeys.DATASET_BASE_OUTPUT_PATH_KEY, "/");
+ if (flowConfig.hasPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)) {
+ List<String> datasetSubpaths = ConfigUtils.getStringList(flowConfig,
ConfigurationKeys.DATASET_SUBPATHS_KEY);
+ String baseInputPath = ConfigUtils.getString(flowConfig,
ConfigurationKeys.DATASET_BASE_INPUT_PATH_KEY, "/");
+ String baseOutputPath = ConfigUtils.getString(flowConfig,
ConfigurationKeys.DATASET_BASE_OUTPUT_PATH_KEY, "/");
- for (String subPath : datasetSubpaths) {
- Config newConfig =
flowSpec.getConfig().withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
- .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId))
+ if (ConfigUtils.getBoolean(flowConfig,
ConfigurationKeys.DATASET_COMBINE_KEY, false)) {
+ Config newConfig =
flowConfig.withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX +
"." + DatasetDescriptorConfigKeys.PATH_KEY,
- ConfigValueFactory.fromAnyRef(new Path(baseInputPath,
subPath).toString()))
+ ConfigValueFactory.fromAnyRef(baseInputPath))
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX +
"." + DatasetDescriptorConfigKeys.PATH_KEY,
- ConfigValueFactory.fromAnyRef(new Path(baseOutputPath,
subPath).toString()));
+ ConfigValueFactory.fromAnyRef(baseOutputPath))
+
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX +
".subPaths",
+
ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(datasetSubpaths)))
+
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX +
".subPaths",
+
ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(datasetSubpaths)));
flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
+ } else {
+ for (String subPath : datasetSubpaths) {
+ Config newConfig =
flowConfig.withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
+ .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
ConfigValueFactory.fromAnyRef(flowExecutionId))
+
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX +
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+ ConfigValueFactory.fromAnyRef(new Path(baseInputPath,
subPath).toString()))
+
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX +
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+ ConfigValueFactory.fromAnyRef(new Path(baseOutputPath,
subPath).toString()));
+ flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
+ }
}
} else {
flowSpecs.add(flowSpec);
@@ -234,6 +249,15 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
return flowSpecs;
}
+ /**
+ * Convert string list to string pattern that will work for globs.
+ *
+ * e.g. ["test1", "test2", test3"] -> "{test1,test2,test}"
+ */
+ private static String convertStringListToGlobPattern(List<String>
stringList) {
+ return "{" + Joiner.on(",").join(stringList) + "}";
+ }
+
private static FlowSpec copyFlowSpecWithNewConfig(FlowSpec flowSpec, Config
newConfig) {
FlowSpec.Builder builder =
FlowSpec.builder(flowSpec.getUri()).withVersion(flowSpec.getVersion())
.withDescription(flowSpec.getDescription()).withConfig(newConfig);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index 905ef81..6de577a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -31,6 +31,7 @@ public class DatasetDescriptorConfigKeys {
public static final String CLASS_KEY = "class";
public static final String PLATFORM_KEY = "platform";
public static final String PATH_KEY = "path";
+ public static final String SUBPATHS_KEY = "subPaths";
public static final String DATABASE_KEY = "databaseName";
public static final String TABLE_KEY = "tableName";
public static final String FORMAT_KEY = "format";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index 0dafc2d..e01f90a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.PathFilter;
import com.google.common.base.Charsets;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigResolveOptions;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -144,7 +143,7 @@ public class FSFlowTemplateCatalog extends FSJobCatalog
implements FlowCatalogWi
FileSystem fs = FileSystem.get(jobFilePath.toUri(), new Configuration());
for (FileStatus fileStatus : fs.listStatus(jobFilePath, extensionFilter)) {
- Config jobConfig = loadHoconFileAtPath(fileStatus.getPath(), true);
+ Config jobConfig = loadHoconFileAtPath(fileStatus.getPath());
//Check if the .job file has an underlying job template
if (jobConfig.hasPath(JOB_TEMPLATE_KEY)) {
URI jobTemplateRelativeUri = new
URI(jobConfig.getString(JOB_TEMPLATE_KEY));
@@ -153,18 +152,17 @@ public class FSFlowTemplateCatalog extends FSJobCatalog
implements FlowCatalogWi
"Expected scheme " + FS_SCHEME + " got unsupported scheme " +
flowTemplateDirURI.getScheme());
}
Path fullJobTemplatePath = PathUtils.mergePaths(new
Path(templateCatalogDir), new Path(jobTemplateRelativeUri));
- jobConfig =
jobConfig.withFallback(loadHoconFileAtPath(fullJobTemplatePath, true));
+ jobConfig =
jobConfig.withFallback(loadHoconFileAtPath(fullJobTemplatePath));
}
jobTemplates.add(new HOCONInputStreamJobTemplate(jobConfig,
fileStatus.getPath().toUri(), this));
}
return jobTemplates;
}
- private Config loadHoconFileAtPath(Path filePath, boolean allowUnresolved)
+ private Config loadHoconFileAtPath(Path filePath)
throws IOException {
- ConfigResolveOptions options =
ConfigResolveOptions.defaults().setAllowUnresolved(allowUnresolved);
try (InputStream is = fs.open(filePath)) {
- return ConfigFactory.parseReader(new InputStreamReader(is,
Charsets.UTF_8)).resolve(options);
+ return ConfigFactory.parseReader(new InputStreamReader(is,
Charsets.UTF_8));
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
index d065cba..88e3759 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
@@ -72,6 +72,14 @@ public class FSDatasetDescriptorTest {
Assert.assertFalse(descriptor3.contains(descriptor5));
Assert.assertFalse(descriptor2.contains(descriptor5));
Assert.assertFalse(descriptor1.contains(descriptor5));
+
+ // Test subpaths
+ Config subPathConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c"))
+ .withValue(DatasetDescriptorConfigKeys.SUBPATHS_KEY,
ConfigValueFactory.fromAnyRef("{e,f,g}"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"));
+ FSDatasetDescriptor descriptor6 = new FSDatasetDescriptor(subPathConfig);
+ Assert.assertTrue(descriptor1.contains(descriptor6));
+ Assert.assertFalse(descriptor2.contains(descriptor6));
}
@Test
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index b0faa6d..176c2d6 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -66,6 +66,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -258,7 +259,7 @@ public class MultiHopFlowCompilerTest {
String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
Assert.assertEquals(targetFsUri,
"hdfs://hadoopnn01.grid.linkedin.com:8888/");
Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
- Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(new
Path(jobConfig.getString("gobblin.dataset.pattern")), new Path(from));
Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
Assert.assertEquals(jobConfig.getString("type"), "java");
Assert.assertEquals(jobConfig.getString("job.class"),
"org.apache.gobblin.runtime.local.LocalJobLauncher");
@@ -431,7 +432,7 @@ public class MultiHopFlowCompilerTest {
String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
Assert.assertEquals(targetFsUri,
"hdfs://hadoopnn02.grid.linkedin.com:8888/");
Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
- Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+ Assert.assertEquals(new
Path(jobConfig.getString("gobblin.dataset.pattern")), new Path(from));
Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
Assert.assertEquals(jobConfig.getString("type"), "java");
Assert.assertEquals(jobConfig.getString("job.class"),
"org.apache.gobblin.runtime.local.LocalJobLauncher");
@@ -614,6 +615,32 @@ public class MultiHopFlowCompilerTest {
}
@Test (dependsOnMethods = "testCompileMultiDatasetFlow")
+ public void testCompileCombinedDatasetFlow() throws Exception {
+ FlowSpec spec = createFlowSpec("flow/flow4.conf", "HDFS-1", "HDFS-3",
true, false);
+
+ Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
+
+ // Should be 2 jobs, each containing 3 datasets
+ Assert.assertEquals(dag.getNodes().size(), 2);
+ Assert.assertEquals(dag.getEndNodes().size(), 1);
+ Assert.assertEquals(dag.getStartNodes().size(), 1);
+
+ String copyJobName =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+ join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3",
"hdfsToHdfs");
+ Config jobConfig =
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+ String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+ Assert.assertTrue(jobName.startsWith(copyJobName));
+
Assert.assertTrue(jobConfig.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
+
+ String retentionJobName =
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+ join("testFlowGroup", "testFlowName", "SnapshotRetention", "HDFS-3",
"HDFS-3", "hdfsRetention");
+ Config jobConfig2 =
dag.getEndNodes().get(0).getValue().getJobSpec().getConfig();
+ String jobName2 = jobConfig2.getString(ConfigurationKeys.JOB_NAME_KEY);
+ Assert.assertTrue(jobName2.startsWith(retentionJobName));
+
Assert.assertTrue(jobConfig2.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
+ }
+
+ @Test (dependsOnMethods = "testCompileMultiDatasetFlow")
public void testGitFlowGraphMonitorService()
throws IOException, GitAPIException, URISyntaxException,
InterruptedException {
File remoteDir = new File(TESTDIR + "/remote");
diff --git a/gobblin-service/src/test/resources/flow/flow4.conf
b/gobblin-service/src/test/resources/flow/flow4.conf
new file mode 100644
index 0000000..5ec8e69
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow4.conf
@@ -0,0 +1,14 @@
+user.to.proxy=testUser
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.dataset.baseInputPath=/data/input
+
+#Output dataset - same as input dataset
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.dataset.baseOutputPath=/data/output
+
+gobblin.flow.dataset.subPaths="dataset0,dataset1,dataset2"
+gobblin.flow.dataset.combine=true
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
index 09a16f0..633fab2 100644
---
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
+++
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
@@ -50,4 +50,13 @@
gobblin.flow.edge.input.dataset.descriptor.5.path=${gobblin.flow.output.dataset.
gobblin.flow.edge.output.dataset.descriptor.5.class=${gobblin.flow.edge.input.dataset.descriptor.5.class}
gobblin.flow.edge.output.dataset.descriptor.5.platform=${gobblin.flow.edge.input.dataset.descriptor.5.platform}
gobblin.flow.edge.output.dataset.descriptor.5.path=${gobblin.flow.output.dataset.descriptor.path}
-gobblin.flow.edge.output.dataset.descriptor.5.isRetentionApplied=true
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.5.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.6.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.6.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.6.path="/data/output/*"
+
+gobblin.flow.edge.output.dataset.descriptor.6.class=${gobblin.flow.edge.input.dataset.descriptor.6.class}
+gobblin.flow.edge.output.dataset.descriptor.6.platform=${gobblin.flow.edge.input.dataset.descriptor.6.platform}
+gobblin.flow.edge.output.dataset.descriptor.6.path=${gobblin.flow.output.dataset.descriptor.path}
+gobblin.flow.edge.output.dataset.descriptor.6.isRetentionApplied=true
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
index d029a31..a532839 100644
---
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
+++
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
@@ -23,4 +23,13 @@
gobblin.flow.edge.input.dataset.descriptor.2.isRetentionApplied=${flow.applyRete
gobblin.flow.edge.output.dataset.descriptor.2.class=${gobblin.flow.edge.input.dataset.descriptor.2.class}
gobblin.flow.edge.output.dataset.descriptor.2.platform=${gobblin.flow.edge.input.dataset.descriptor.2.platform}
-gobblin.flow.edge.output.dataset.descriptor.2.path=${gobblin.flow.output.dataset.descriptor.path}
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.2.path=${gobblin.flow.output.dataset.descriptor.path}
+
+gobblin.flow.edge.input.dataset.descriptor.3.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.3.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.3.path="/data/input/*"
+gobblin.flow.edge.input.dataset.descriptor.3.isRetentionApplied=${flow.applyRetention}
+
+gobblin.flow.edge.output.dataset.descriptor.3.class=${gobblin.flow.edge.input.dataset.descriptor.3.class}
+gobblin.flow.edge.output.dataset.descriptor.3.platform=${gobblin.flow.edge.input.dataset.descriptor.3.platform}
+gobblin.flow.edge.output.dataset.descriptor.3.path=${gobblin.flow.output.dataset.descriptor.path}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
index 63a4f89..df47020 100644
---
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
+++
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
@@ -36,7 +36,7 @@
gobblin.dataset.profile.class="org.apache.gobblin.data.management.copy.CopyableG
# target location for copy
data.publisher.final.dir=${to}
-gobblin.dataset.pattern=${from}
+gobblin.dataset.pattern=${from}/${?gobblin.flow.edge.input.dataset.descriptor.subPaths}
data.publisher.type="org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher"
source.class="org.apache.gobblin.data.management.copy.CopySource"
diff --git
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
index 23c8b36..1a9465d 100644
---
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
+++
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
@@ -4,7 +4,7 @@
job.name=SnapshotRetention
job.class=gobblin.data.management.retention.DatasetCleanerJob
#Dataset pattern
-gobblin.dataset.pattern=${gobblin.flow.edge.input.dataset.descriptor.path}
+gobblin.dataset.pattern=${gobblin.flow.edge.input.dataset.descriptor.path}/${?gobblin.flow.edge.input.dataset.descriptor.subPaths}
#Dataset finder class
gobblin.dataset.profile.class="org.apache.gobblin.data.management.retention.profile.SnapshotDatasetProfile"
# Dataset version finder