This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new baf2abe  [GOBBLIN-894] Add option to combine datasets into a single 
flow
baf2abe is described below

commit baf2abe35cedeb7dec01528cbcca659c4fd73fa9
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Oct 10 20:52:37 2019 -0700

    [GOBBLIN-894] Add option to combine datasets into a single flow
    
    Closes #2749 from jack-moseley/combine-flows
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../modules/dataset/BaseDatasetDescriptor.java     |  6 ++--
 .../modules/dataset/FSDatasetDescriptor.java       | 33 ++++++++++++++++-
 .../modules/dataset/SqlDatasetDescriptor.java      |  7 ++--
 .../service/modules/flow/MultiHopFlowCompiler.java | 42 +++++++++++++++++-----
 .../flowgraph/DatasetDescriptorConfigKeys.java     |  1 +
 .../template_catalog/FSFlowTemplateCatalog.java    | 10 +++---
 .../modules/dataset/FSDatasetDescriptorTest.java   |  8 +++++
 .../modules/flow/MultiHopFlowCompilerTest.java     | 31 ++++++++++++++--
 gobblin-service/src/test/resources/flow/flow4.conf | 14 ++++++++
 .../hdfsSnapshotRetention/flow.conf                | 11 +++++-
 .../flowEdgeTemplates/hdfsToHdfs/flow.conf         | 11 +++++-
 .../multihop/jobTemplates/distcp.template          |  2 +-
 .../multihop/jobTemplates/hdfs-retention.template  |  2 +-
 14 files changed, 151 insertions(+), 28 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index dc8b8cb..c6572a9 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -898,6 +898,7 @@ public class ConfigurationKeys {
   public static final String DATASET_SUBPATHS_KEY = 
"gobblin.flow.dataset.subPaths";
   public static final String DATASET_BASE_INPUT_PATH_KEY = 
"gobblin.flow.dataset.baseInputPath";
   public static final String DATASET_BASE_OUTPUT_PATH_KEY = 
"gobblin.flow.dataset.baseOutputPath";
+  public static final String DATASET_COMBINE_KEY = 
"gobblin.flow.dataset.combine";
 
   /***
    * Configuration properties related to TopologySpec Store
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
index c9ad8b5..cd98c8b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
@@ -62,7 +62,7 @@ public abstract class BaseDatasetDescriptor implements 
DatasetDescriptor {
   /**
    * {@inheritDoc}
    */
-  protected abstract boolean isPathContaining(String otherPath);
+  protected abstract boolean isPathContaining(DatasetDescriptor other);
 
   /**
    * @return true if this {@link DatasetDescriptor} contains the other {@link 
DatasetDescriptor} i.e. the
@@ -75,7 +75,7 @@ public abstract class BaseDatasetDescriptor implements 
DatasetDescriptor {
     if (this == other) {
       return true;
     }
-    
+
     if (other == null || !getClass().equals(other.getClass())) {
       return false;
     }
@@ -88,6 +88,6 @@ public abstract class BaseDatasetDescriptor implements 
DatasetDescriptor {
       return false;
     }
 
-    return isPathContaining(other.getPath()) && 
getFormatConfig().contains(other.getFormatConfig());
+    return isPathContaining(other) && 
getFormatConfig().contains(other.getFormatConfig());
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index 3b2611e..57f23b1 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -18,10 +18,13 @@
 package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
+import java.util.List;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.GlobPattern;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -46,6 +49,8 @@ public class FSDatasetDescriptor extends 
BaseDatasetDescriptor implements Datase
   @Getter
   private final String path;
   @Getter
+  private final String subPaths;
+  @Getter
   private final boolean isCompacted;
   @Getter
   private final boolean isCompactedAndDeduped;
@@ -65,6 +70,7 @@ public class FSDatasetDescriptor extends 
BaseDatasetDescriptor implements Datase
     this.path = PathUtils
         .getPathWithoutSchemeAndAuthority(new 
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
             
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
+    this.subPaths = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.SUBPATHS_KEY, null);
     this.isCompacted = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
     this.isCompactedAndDeduped = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
     this.partitionConfig = new 
FSDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config, 
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
@@ -72,6 +78,31 @@ public class FSDatasetDescriptor extends 
BaseDatasetDescriptor implements Datase
   }
 
   /**
+   * If other descriptor has subpaths, this method checks that each 
concatenation of path + subpath is matched by this
+   * path. Otherwise, it just checks the path.
+   *
+   * @param other descriptor whose path/subpaths to check
+   * @return true if all subpaths are matched by this {@link 
DatasetDescriptor}'s path, or if subpaths is null and
+   * the other's path matches this path.
+   */
+  @Override
+  protected boolean isPathContaining(DatasetDescriptor other) {
+    String otherPath = other.getPath();
+    String otherSubPaths = ((FSDatasetDescriptor) other).getSubPaths();
+    if (otherSubPaths != null) {
+      List<String> subPaths = 
Splitter.on(",").splitToList(StringUtils.stripEnd(StringUtils.stripStart(otherSubPaths,
 "{"), "}"));
+      for (String subPath : subPaths) {
+        if (!isPathContaining(new Path(otherPath, subPath).toString())) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return isPathContaining(otherPath);
+    }
+  }
+
+  /**
    * A helper to determine if the path description of this {@link 
DatasetDescriptor} is a superset of paths
    * accepted by the other {@link DatasetDescriptor}. If the path description 
of the other {@link DatasetDescriptor}
    * is a glob pattern, we return false.
@@ -79,7 +110,7 @@ public class FSDatasetDescriptor extends 
BaseDatasetDescriptor implements Datase
    * @param otherPath a glob pattern that describes a set of paths.
    * @return true if the glob pattern described by the otherPath matches the 
path in this {@link DatasetDescriptor}.
    */
-  protected boolean isPathContaining(String otherPath) {
+  private boolean isPathContaining(String otherPath) {
     if (otherPath == null) {
       return false;
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index 2811992..a334ab6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -89,7 +89,7 @@ public class SqlDatasetDescriptor extends 
BaseDatasetDescriptor implements Datas
     return Enums.getIfPresent(Platform.class, 
getPlatform().toUpperCase()).isPresent();
   }
   /**
-   * Check if the dbName and tableName specified in {@param otherPath} are 
accepted by the set of dbName.tableName
+   * Check if the dbName and tableName specified in {@param other}'s path are 
accepted by the set of dbName.tableName
    * combinations defined by the current {@link SqlDatasetDescriptor}. For 
example, let:
    * this.path = "test_.*;test_table_.*". Then:
    * isPathContaining("test_db1;test_table_1") = true
@@ -98,10 +98,11 @@ public class SqlDatasetDescriptor extends 
BaseDatasetDescriptor implements Datas
    * NOTE: otherPath cannot be a globPattern. So:
    * isPathContaining("test_db.*;test_table_*") = false
    *
-   * @param otherPath which should be in the format of dbName.tableName
+   * @param other whose path should be in the format of dbName.tableName
    */
   @Override
-  protected boolean isPathContaining(String otherPath) {
+  protected boolean isPathContaining(DatasetDescriptor other) {
+    String otherPath = other.getPath();
     if (otherPath == null) {
       return false;
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 67d1f19..dbc01ba 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -212,20 +213,34 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
   private static List<FlowSpec> splitFlowSpec(FlowSpec flowSpec) {
     long flowExecutionId = FlowUtils.getOrCreateFlowExecutionId(flowSpec);
     List<FlowSpec> flowSpecs = new ArrayList<>();
+    Config flowConfig = flowSpec.getConfig();
 
-    if (flowSpec.getConfig().hasPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)) {
-      List<String> datasetSubpaths = 
ConfigUtils.getStringList(flowSpec.getConfig(), 
ConfigurationKeys.DATASET_SUBPATHS_KEY);
-      String baseInputPath = ConfigUtils.getString(flowSpec.getConfig(), 
ConfigurationKeys.DATASET_BASE_INPUT_PATH_KEY, "/");
-      String baseOutputPath = ConfigUtils.getString(flowSpec.getConfig(), 
ConfigurationKeys.DATASET_BASE_OUTPUT_PATH_KEY, "/");
+    if (flowConfig.hasPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)) {
+      List<String> datasetSubpaths = ConfigUtils.getStringList(flowConfig, 
ConfigurationKeys.DATASET_SUBPATHS_KEY);
+      String baseInputPath = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.DATASET_BASE_INPUT_PATH_KEY, "/");
+      String baseOutputPath = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.DATASET_BASE_OUTPUT_PATH_KEY, "/");
 
-      for (String subPath : datasetSubpaths) {
-        Config newConfig = 
flowSpec.getConfig().withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
-            .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
ConfigValueFactory.fromAnyRef(flowExecutionId))
+      if (ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.DATASET_COMBINE_KEY, false)) {
+        Config newConfig = 
flowConfig.withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
             
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + 
"." + DatasetDescriptorConfigKeys.PATH_KEY,
-                ConfigValueFactory.fromAnyRef(new Path(baseInputPath, 
subPath).toString()))
+                ConfigValueFactory.fromAnyRef(baseInputPath))
             
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + 
"." + DatasetDescriptorConfigKeys.PATH_KEY,
-                ConfigValueFactory.fromAnyRef(new Path(baseOutputPath, 
subPath).toString()));
+                ConfigValueFactory.fromAnyRef(baseOutputPath))
+            
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + 
".subPaths",
+                
ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(datasetSubpaths)))
+            
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + 
".subPaths",
+                
ConfigValueFactory.fromAnyRef(convertStringListToGlobPattern(datasetSubpaths)));
         flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
+      } else {
+        for (String subPath : datasetSubpaths) {
+          Config newConfig = 
flowConfig.withoutPath(ConfigurationKeys.DATASET_SUBPATHS_KEY)
+              .withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
ConfigValueFactory.fromAnyRef(flowExecutionId))
+              
.withValue(DatasetDescriptorConfigKeys.FLOW_INPUT_DATASET_DESCRIPTOR_PREFIX + 
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+                  ConfigValueFactory.fromAnyRef(new Path(baseInputPath, 
subPath).toString()))
+              
.withValue(DatasetDescriptorConfigKeys.FLOW_OUTPUT_DATASET_DESCRIPTOR_PREFIX + 
"." + DatasetDescriptorConfigKeys.PATH_KEY,
+                  ConfigValueFactory.fromAnyRef(new Path(baseOutputPath, 
subPath).toString()));
+          flowSpecs.add(copyFlowSpecWithNewConfig(flowSpec, newConfig));
+        }
       }
     } else {
       flowSpecs.add(flowSpec);
@@ -234,6 +249,15 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     return flowSpecs;
   }
 
+  /**
+   * Convert string list to string pattern that will work for globs.
+   *
+   * e.g. ["test1", "test2", test3"] -> "{test1,test2,test}"
+   */
+  private static String convertStringListToGlobPattern(List<String> 
stringList) {
+    return "{" + Joiner.on(",").join(stringList) + "}";
+  }
+
   private static FlowSpec copyFlowSpecWithNewConfig(FlowSpec flowSpec, Config 
newConfig) {
     FlowSpec.Builder builder = 
FlowSpec.builder(flowSpec.getUri()).withVersion(flowSpec.getVersion())
         .withDescription(flowSpec.getDescription()).withConfig(newConfig);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index 905ef81..6de577a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -31,6 +31,7 @@ public class DatasetDescriptorConfigKeys {
   public static final String CLASS_KEY = "class";
   public static final String PLATFORM_KEY = "platform";
   public static final String PATH_KEY = "path";
+  public static final String SUBPATHS_KEY = "subPaths";
   public static final String DATABASE_KEY = "databaseName";
   public static final String TABLE_KEY = "tableName";
   public static final String FORMAT_KEY = "format";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
index 0dafc2d..e01f90a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.fs.PathFilter;
 import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigResolveOptions;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -144,7 +143,7 @@ public class FSFlowTemplateCatalog extends FSJobCatalog 
implements FlowCatalogWi
     FileSystem fs = FileSystem.get(jobFilePath.toUri(), new Configuration());
 
     for (FileStatus fileStatus : fs.listStatus(jobFilePath, extensionFilter)) {
-      Config jobConfig = loadHoconFileAtPath(fileStatus.getPath(), true);
+      Config jobConfig = loadHoconFileAtPath(fileStatus.getPath());
       //Check if the .job file has an underlying job template
       if (jobConfig.hasPath(JOB_TEMPLATE_KEY)) {
         URI jobTemplateRelativeUri = new 
URI(jobConfig.getString(JOB_TEMPLATE_KEY));
@@ -153,18 +152,17 @@ public class FSFlowTemplateCatalog extends FSJobCatalog 
implements FlowCatalogWi
               "Expected scheme " + FS_SCHEME + " got unsupported scheme " + 
flowTemplateDirURI.getScheme());
         }
         Path fullJobTemplatePath = PathUtils.mergePaths(new 
Path(templateCatalogDir), new Path(jobTemplateRelativeUri));
-        jobConfig = 
jobConfig.withFallback(loadHoconFileAtPath(fullJobTemplatePath, true));
+        jobConfig = 
jobConfig.withFallback(loadHoconFileAtPath(fullJobTemplatePath));
       }
       jobTemplates.add(new HOCONInputStreamJobTemplate(jobConfig, 
fileStatus.getPath().toUri(), this));
     }
     return jobTemplates;
   }
 
-  private Config loadHoconFileAtPath(Path filePath, boolean allowUnresolved)
+  private Config loadHoconFileAtPath(Path filePath)
       throws IOException {
-    ConfigResolveOptions options = 
ConfigResolveOptions.defaults().setAllowUnresolved(allowUnresolved);
     try (InputStream is = fs.open(filePath)) {
-      return ConfigFactory.parseReader(new InputStreamReader(is, 
Charsets.UTF_8)).resolve(options);
+      return ConfigFactory.parseReader(new InputStreamReader(is, 
Charsets.UTF_8));
     }
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
index d065cba..88e3759 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
@@ -72,6 +72,14 @@ public class FSDatasetDescriptorTest {
     Assert.assertFalse(descriptor3.contains(descriptor5));
     Assert.assertFalse(descriptor2.contains(descriptor5));
     Assert.assertFalse(descriptor1.contains(descriptor5));
+
+    // Test subpaths
+    Config subPathConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c"))
+        .withValue(DatasetDescriptorConfigKeys.SUBPATHS_KEY, 
ConfigValueFactory.fromAnyRef("{e,f,g}"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"));
+    FSDatasetDescriptor descriptor6 = new FSDatasetDescriptor(subPathConfig);
+    Assert.assertTrue(descriptor1.contains(descriptor6));
+    Assert.assertFalse(descriptor2.contains(descriptor6));
   }
 
   @Test
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index b0faa6d..176c2d6 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -66,6 +66,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -258,7 +259,7 @@ public class MultiHopFlowCompilerTest {
     String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
     Assert.assertEquals(targetFsUri, 
"hdfs://hadoopnn01.grid.linkedin.com:8888/");
     Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
-    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(new 
Path(jobConfig.getString("gobblin.dataset.pattern")), new Path(from));
     Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
     Assert.assertEquals(jobConfig.getString("type"), "java");
     Assert.assertEquals(jobConfig.getString("job.class"), 
"org.apache.gobblin.runtime.local.LocalJobLauncher");
@@ -431,7 +432,7 @@ public class MultiHopFlowCompilerTest {
     String targetFsUri = jobConfig.getString("target.filebased.fs.uri");
     Assert.assertEquals(targetFsUri, 
"hdfs://hadoopnn02.grid.linkedin.com:8888/");
     Assert.assertEquals(jobConfig.getString("writer.fs.uri"), targetFsUri);
-    Assert.assertEquals(jobConfig.getString("gobblin.dataset.pattern"), from);
+    Assert.assertEquals(new 
Path(jobConfig.getString("gobblin.dataset.pattern")), new Path(from));
     Assert.assertEquals(jobConfig.getString("data.publisher.final.dir"), to);
     Assert.assertEquals(jobConfig.getString("type"), "java");
     Assert.assertEquals(jobConfig.getString("job.class"), 
"org.apache.gobblin.runtime.local.LocalJobLauncher");
@@ -614,6 +615,32 @@ public class MultiHopFlowCompilerTest {
   }
 
   @Test (dependsOnMethods = "testCompileMultiDatasetFlow")
+  public void testCompileCombinedDatasetFlow() throws Exception {
+    FlowSpec spec = createFlowSpec("flow/flow4.conf", "HDFS-1", "HDFS-3", 
true, false);
+
+    Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
+
+    // Should be 2 jobs, each containing 3 datasets
+    Assert.assertEquals(dag.getNodes().size(), 2);
+    Assert.assertEquals(dag.getEndNodes().size(), 1);
+    Assert.assertEquals(dag.getStartNodes().size(), 1);
+
+    String copyJobName = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "Distcp", "HDFS-1", "HDFS-3", 
"hdfsToHdfs");
+    Config jobConfig = 
dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName.startsWith(copyJobName));
+    
Assert.assertTrue(jobConfig.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
+
+    String retentionJobName = 
Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "SnapshotRetention", "HDFS-3", 
"HDFS-3", "hdfsRetention");
+    Config jobConfig2 = 
dag.getEndNodes().get(0).getValue().getJobSpec().getConfig();
+    String jobName2 = jobConfig2.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName2.startsWith(retentionJobName));
+    
Assert.assertTrue(jobConfig2.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
+  }
+
+  @Test (dependsOnMethods = "testCompileMultiDatasetFlow")
   public void testGitFlowGraphMonitorService()
       throws IOException, GitAPIException, URISyntaxException, 
InterruptedException {
     File remoteDir = new File(TESTDIR + "/remote");
diff --git a/gobblin-service/src/test/resources/flow/flow4.conf 
b/gobblin-service/src/test/resources/flow/flow4.conf
new file mode 100644
index 0000000..5ec8e69
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow4.conf
@@ -0,0 +1,14 @@
+user.to.proxy=testUser
+
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.dataset.baseInputPath=/data/input
+
+#Output dataset - same as input dataset
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.dataset.baseOutputPath=/data/output
+
+gobblin.flow.dataset.subPaths="dataset0,dataset1,dataset2"
+gobblin.flow.dataset.combine=true
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
index 09a16f0..633fab2 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsSnapshotRetention/flow.conf
@@ -50,4 +50,13 @@ 
gobblin.flow.edge.input.dataset.descriptor.5.path=${gobblin.flow.output.dataset.
 
gobblin.flow.edge.output.dataset.descriptor.5.class=${gobblin.flow.edge.input.dataset.descriptor.5.class}
 
gobblin.flow.edge.output.dataset.descriptor.5.platform=${gobblin.flow.edge.input.dataset.descriptor.5.platform}
 
gobblin.flow.edge.output.dataset.descriptor.5.path=${gobblin.flow.output.dataset.descriptor.path}
-gobblin.flow.edge.output.dataset.descriptor.5.isRetentionApplied=true
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.5.isRetentionApplied=true
+
+gobblin.flow.edge.input.dataset.descriptor.6.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.6.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.6.path="/data/output/*"
+
+gobblin.flow.edge.output.dataset.descriptor.6.class=${gobblin.flow.edge.input.dataset.descriptor.6.class}
+gobblin.flow.edge.output.dataset.descriptor.6.platform=${gobblin.flow.edge.input.dataset.descriptor.6.platform}
+gobblin.flow.edge.output.dataset.descriptor.6.path=${gobblin.flow.output.dataset.descriptor.path}
+gobblin.flow.edge.output.dataset.descriptor.6.isRetentionApplied=true
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
index d029a31..a532839 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToHdfs/flow.conf
@@ -23,4 +23,13 @@ 
gobblin.flow.edge.input.dataset.descriptor.2.isRetentionApplied=${flow.applyRete
 
 
gobblin.flow.edge.output.dataset.descriptor.2.class=${gobblin.flow.edge.input.dataset.descriptor.2.class}
 
gobblin.flow.edge.output.dataset.descriptor.2.platform=${gobblin.flow.edge.input.dataset.descriptor.2.platform}
-gobblin.flow.edge.output.dataset.descriptor.2.path=${gobblin.flow.output.dataset.descriptor.path}
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.2.path=${gobblin.flow.output.dataset.descriptor.path}
+
+gobblin.flow.edge.input.dataset.descriptor.3.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.3.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.3.path="/data/input/*"
+gobblin.flow.edge.input.dataset.descriptor.3.isRetentionApplied=${flow.applyRetention}
+
+gobblin.flow.edge.output.dataset.descriptor.3.class=${gobblin.flow.edge.input.dataset.descriptor.3.class}
+gobblin.flow.edge.output.dataset.descriptor.3.platform=${gobblin.flow.edge.input.dataset.descriptor.3.platform}
+gobblin.flow.edge.output.dataset.descriptor.3.path=${gobblin.flow.output.dataset.descriptor.path}
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
 
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
index 63a4f89..df47020 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp.template
@@ -36,7 +36,7 @@ 
gobblin.dataset.profile.class="org.apache.gobblin.data.management.copy.CopyableG
 
 # target location for copy
 data.publisher.final.dir=${to}
-gobblin.dataset.pattern=${from}
+gobblin.dataset.pattern=${from}/${?gobblin.flow.edge.input.dataset.descriptor.subPaths}
 
 
data.publisher.type="org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher"
 source.class="org.apache.gobblin.data.management.copy.CopySource"
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
 
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
index 23c8b36..1a9465d 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/hdfs-retention.template
@@ -4,7 +4,7 @@
 job.name=SnapshotRetention
 job.class=gobblin.data.management.retention.DatasetCleanerJob
 #Dataset pattern
-gobblin.dataset.pattern=${gobblin.flow.edge.input.dataset.descriptor.path}
+gobblin.dataset.pattern=${gobblin.flow.edge.input.dataset.descriptor.path}/${?gobblin.flow.edge.input.dataset.descriptor.subPaths}
 #Dataset finder class
 
gobblin.dataset.profile.class="org.apache.gobblin.data.management.retention.profile.SnapshotDatasetProfile"
 # Dataset version finder

Reply via email to