This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new abc6074  [GOBBLIN-1038] Set default dataset descriptor configs based 
on the DataNode
abc6074 is described below

commit abc607406562b1ce0b3280e9f2e721db8d39d77c
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Feb 12 10:50:34 2020 -0800

    [GOBBLIN-1038] Set default dataset descriptor configs based on the DataNode
    
    Closes #2880 from jack-moseley/infer-descriptor
---
 .../service/modules/flowgraph/BaseDataNode.java    | 10 +++++++++
 .../service/modules/flowgraph/DataNode.java        | 10 +++++++++
 .../modules/flowgraph/datanodes/HttpDataNode.java  | 12 ++++++++++
 .../modules/flowgraph/datanodes/SqlDataNode.java   |  6 +++++
 .../flowgraph/datanodes/fs/AdlsDataNode.java       |  5 +++++
 .../flowgraph/datanodes/fs/FileSystemDataNode.java |  6 +++++
 .../flowgraph/datanodes/fs/HdfsDataNode.java       |  5 +++++
 .../flowgraph/datanodes/fs/LocalFSDataNode.java    |  9 +++++++-
 .../flowgraph/datanodes/hive/HiveDataNode.java     | 12 ++++++++++
 .../flowgraph/pathfinder/AbstractPathFinder.java   | 26 ++++++++++++++++++++++
 10 files changed, 100 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index 23bf83b..a3045fd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -56,4 +56,14 @@ public class BaseDataNode implements DataNode {
       throw new DataNodeCreationException(e);
     }
   }
+
+  @Override
+  public String getDefaultDatasetDescriptorClass() {
+    return null;
+  }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return null;
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
index b7a5274..f616ded 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DataNode.java
@@ -39,6 +39,16 @@ public interface DataNode {
   Config getRawConfig();
 
   /**
+   * @return a default dataset descriptor class for this DataNode, or null if 
a default should not be used.
+   */
+  String getDefaultDatasetDescriptorClass();
+
+  /**
+   * @return a default dataset descriptor platform for this DataNode, or null 
if a default should not be used.
+   */
+  String getDefaultDatasetDescriptorPlatform();
+
+  /**
    * @return true if the {@link DataNode} is active
    */
   boolean isActive();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/HttpDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/HttpDataNode.java
index 4e84715..efd5299 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/HttpDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/HttpDataNode.java
@@ -24,6 +24,7 @@ import joptsimple.internal.Strings;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
+import org.apache.gobblin.service.modules.dataset.HttpDatasetDescriptor;
 import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
 import org.apache.gobblin.service.modules.flowgraph.DataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
@@ -41,6 +42,7 @@ public class HttpDataNode extends BaseDataNode  {
   private String httpDomain;
   @Getter
   private String authenticationType;
+  public static final String PLATFORM = "http";
 
   public HttpDataNode(Config nodeProps) throws 
DataNode.DataNodeCreationException {
     super(nodeProps);
@@ -58,4 +60,14 @@ public class HttpDataNode extends BaseDataNode  {
       throw new DataNode.DataNodeCreationException(e);
     }
   }
+
+  @Override
+  public String getDefaultDatasetDescriptorClass() {
+    return HttpDatasetDescriptor.class.getCanonicalName();
+  }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return PLATFORM;
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/SqlDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/SqlDataNode.java
index 9e05409..00dd0a9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/SqlDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/SqlDataNode.java
@@ -23,6 +23,7 @@ import joptsimple.internal.Strings;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
+import org.apache.gobblin.service.modules.dataset.SqlDatasetDescriptor;
 import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
@@ -53,4 +54,9 @@ public class SqlDataNode extends BaseDataNode {
       throw new DataNodeCreationException(e);
     }
   }
+
+  @Override
+  public String getDefaultDatasetDescriptorClass() {
+    return SqlDatasetDescriptor.class.getCanonicalName();
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
index a01a482..cc830fd 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/AdlsDataNode.java
@@ -50,4 +50,9 @@ public class AdlsDataNode extends FileSystemDataNode {
     }
     return true;
   }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return ADLS_SCHEME;
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
index 9830f47..b252930 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
@@ -28,6 +28,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor;
 import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
@@ -64,4 +65,9 @@ public abstract class FileSystemDataNode extends BaseDataNode 
{
   }
 
   public abstract boolean isUriValid(URI fsUri);
+
+  @Override
+  public String getDefaultDatasetDescriptorClass() {
+    return FSDatasetDescriptor.class.getCanonicalName();
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
index 5402074..8c0fac5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/HdfsDataNode.java
@@ -55,4 +55,9 @@ public class HdfsDataNode extends FileSystemDataNode {
     }
     return true;
   }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return HDFS_SCHEME;
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
index 757d4a0..81f91c8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/LocalFSDataNode.java
@@ -19,9 +19,10 @@ package 
org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
 
 import java.net.URI;
 
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.annotation.Alpha;
 
-import com.typesafe.config.Config;
 
 /**
  * An implementation of {@link LocalFSDataNode}. All the properties specific 
to a LocalFS based data node (e.g. fs.uri)
@@ -30,6 +31,7 @@ import com.typesafe.config.Config;
 @Alpha
 public class LocalFSDataNode extends FileSystemDataNode {
   public static final String LOCAL_FS_SCHEME = "file";
+  public static final String PLATFORM = "local";
 
   public LocalFSDataNode(Config nodeProps) throws DataNodeCreationException {
     super(nodeProps);
@@ -48,4 +50,9 @@ public class LocalFSDataNode extends FileSystemDataNode {
     }
     return false;
   }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return PLATFORM;
+  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
index 9eca408..f324ef6 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
@@ -28,6 +28,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.dataset.HiveDatasetDescriptor;
 import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
@@ -41,6 +42,7 @@ import org.apache.gobblin.util.ConfigUtils;
 @EqualsAndHashCode (callSuper = true)
 public class HiveDataNode extends BaseDataNode {
   public static final String METASTORE_URI_KEY = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "hive.metastore.uri";
+  public static final String PLATFORM = "hive";
 
   @Getter
   private String metastoreUri;
@@ -78,4 +80,14 @@ public class HiveDataNode extends BaseDataNode {
     }
     return true;
   }
+
+  @Override
+  public String getDefaultDatasetDescriptorClass() {
+    return HiveDatasetDescriptor.class.getCanonicalName();
+  }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return PLATFORM;
+  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 674c187..86e8117 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -24,6 +24,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -32,6 +33,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValue;
 import com.typesafe.config.ConfigValueFactory;
 
@@ -103,6 +105,11 @@ public abstract class AbstractPathFinder implements 
PathFinder {
       this.destNodes.add(destNode);
     }
 
+    // All dest nodes should be the same class
+    if (this.destNodes != null && 
this.destNodes.stream().map(Object::getClass).collect(Collectors.toSet()).size()
 > 1) {
+      throw new RuntimeException("All destination nodes must use the same 
DataNode class");
+    }
+
     //Should apply retention?
     boolean shouldApplyRetention = ConfigUtils.getBoolean(flowConfig, 
ConfigurationKeys.FLOW_APPLY_RETENTION, true);
     //Should apply retention on input dataset?
@@ -144,6 +151,11 @@ public abstract class AbstractPathFinder implements 
PathFinder {
     flowConfig = flowConfig.withValue(ConfigurationKeys.FLOW_APPLY_RETENTION, 
ConfigValueFactory.fromAnyRef(shouldApplyRetention));
     flowConfig = 
flowConfig.withValue(ConfigurationKeys.FLOW_APPLY_INPUT_RETENTION, 
ConfigValueFactory.fromAnyRef(shouldApplyRetentionOnInput));
 
+    srcDatasetDescriptorConfig = 
srcDatasetDescriptorConfig.withFallback(getDefaultConfig(this.srcNode));
+    if (this.destNodes != null) {
+      destDatasetDescriptorConfig = 
destDatasetDescriptorConfig.withFallback(getDefaultConfig(this.destNodes.get(0)));
+    }
+
     Class srcdatasetDescriptorClass =
         
Class.forName(srcDatasetDescriptorConfig.getString(DatasetDescriptorConfigKeys.CLASS_KEY));
     this.srcDatasetDescriptor = (DatasetDescriptor) GobblinConstructorUtils
@@ -155,6 +167,20 @@ public abstract class AbstractPathFinder implements 
PathFinder {
 
   }
 
+  private Config getDefaultConfig(DataNode dataNode) {
+    Config defaultConfig = ConfigFactory.empty();
+
+    if (dataNode.getDefaultDatasetDescriptorClass() != null) {
+      defaultConfig = 
defaultConfig.withValue(DatasetDescriptorConfigKeys.CLASS_KEY, 
ConfigValueFactory.fromAnyRef(dataNode.getDefaultDatasetDescriptorClass()));
+    }
+
+    if (dataNode.getDefaultDatasetDescriptorPlatform() != null) {
+      defaultConfig = 
defaultConfig.withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef(dataNode.getDefaultDatasetDescriptorPlatform()));
+    }
+
+    return defaultConfig;
+  }
+
   boolean isPathFound(DataNode currentNode, DataNode destNode, 
DatasetDescriptor currentDatasetDescriptor,
       DatasetDescriptor destDatasetDescriptor) {
     return (currentNode.equals(destNode)) && 
(currentDatasetDescriptor.equals(destDatasetDescriptor));

Reply via email to