This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5027659  [GOBBLIN-708] Create SqlDatasetDescriptor for JDBC-sourced 
datasets.
5027659 is described below

commit 5027659ace1f576a715dffaac24b2c30a08f76f4
Author: suvasude <[email protected]>
AuthorDate: Tue Apr 30 17:31:37 2019 -0700

    [GOBBLIN-708] Create SqlDatasetDescriptor for JDBC-sourced datasets.
    
    Closes #2577 from sv2000/sqlDatasetDescriptor
---
 .../modules/dataset/BaseDatasetDescriptor.java     |  93 ++++++++++++++++
 .../service/modules/dataset/EncryptionConfig.java  |  44 ++------
 .../modules/dataset/FSDatasetDescriptor.java       |  99 +++-------------
 ...onConfig.java => FSDatasetPartitionConfig.java} |  43 ++-----
 .../service/modules/dataset/FormatConfig.java      |  37 +-----
 .../modules/dataset/SqlDatasetDescriptor.java      | 124 +++++++++++++++++++++
 .../service/modules/flowgraph/BaseDataNode.java    |  32 +-----
 .../flowgraph/DatasetDescriptorConfigKeys.java     |   4 +-
 .../service/modules/flowgraph/SqlDataNode.java     |  54 +++++++++
 .../flowgraph/datanodes/fs/FileSystemDataNode.java |  30 +----
 .../modules/dataset/SqlDatasetDescriptorTest.java  |  61 ++++++++++
 11 files changed, 385 insertions(+), 236 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
new file mode 100644
index 0000000..c9ad8b5
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+@EqualsAndHashCode (exclude = {"description", "rawConfig"})
+@ToString (exclude = {"description", "rawConfig"})
+public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
+  @Getter
+  private final String platform;
+  @Getter
+  private final FormatConfig formatConfig;
+  @Getter
+  private final boolean isRetentionApplied;
+  @Getter
+  private final String description;
+  @Getter
+  private final Config rawConfig;
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(DatasetDescriptorConfigKeys.PATH_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false)
+          .build());
+
+  public BaseDatasetDescriptor(Config config) throws IOException {
+    
Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY),
 "Dataset descriptor config must specify platform");
+    this.platform = 
config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY).toLowerCase();
+    this.formatConfig = new FormatConfig(config);
+    this.isRetentionApplied = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
+    this.description = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+    this.rawConfig = 
config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  protected abstract boolean isPathContaining(String otherPath);
+
+  /**
+   * @return true if this {@link DatasetDescriptor} contains the other {@link 
DatasetDescriptor} i.e. the
+   * datasets described by this {@link DatasetDescriptor} is a subset of the 
datasets described by the other
+   * {@link DatasetDescriptor}. This operation is non-commutative.
+   * @param other
+   */
+  @Override
+  public boolean contains(DatasetDescriptor other) {
+    if (this == other) {
+      return true;
+    }
+    
+    if (other == null || !getClass().equals(other.getClass())) {
+      return false;
+    }
+
+    if (this.getPlatform() == null || 
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+      return false;
+    }
+
+    if ((this.isRetentionApplied() != other.isRetentionApplied())) {
+      return false;
+    }
+
+    return isPathContaining(other.getPath()) && 
getFormatConfig().contains(other.getFormatConfig());
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
index 08b96ac..85e5364 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -20,18 +20,21 @@ package org.apache.gobblin.service.modules.dataset;
 import java.io.IOException;
 
 import com.google.common.base.Enums;
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
 @Slf4j
+@ToString(exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"})
 public class EncryptionConfig {
   @Getter
   private final String encryptionAlgorithm;
@@ -85,13 +88,13 @@ public class EncryptionConfig {
       this.encryptedFields = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
     } else {
       this.keystoreType = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
-          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+          
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
       this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
-          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+          
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
       this.encryptionLevel = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
-          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+          
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
       this.encryptedFields = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS,
-          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+          
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
       validate(this.encryptionLevel, this.encryptedFields);
     }
     this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK);
@@ -143,35 +146,4 @@ public class EncryptionConfig {
         && 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptedFields())
         || this.encryptedFields.equalsIgnoreCase(otherEncryptedFields));
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (!(o instanceof EncryptionConfig)) {
-      return false;
-    }
-    EncryptionConfig other = (EncryptionConfig) o;
-    return 
this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm()) 
&& this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
-        && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType()) && 
this.getEncryptionLevel().equals(other.getEncryptionLevel())
-        && 
this.getEncryptedFields().equalsIgnoreCase(other.getEncryptedFields());
-  }
-
-  @Override
-  public String toString() {
-    return "(" + Joiner.on(",").join(this.encryptionAlgorithm, 
this.encryptionLevel, this.encryptedFields, this.keystoreType, 
this.keystoreEncoding) + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 17;
-    result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
-    result = 31 * result + keystoreType.toLowerCase().hashCode();
-    result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
-    result = 31 * result + encryptionLevel.toLowerCase().hashCode();
-    result = 31 * result + encryptedFields.toLowerCase().hashCode();
-    return result;
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index 8b5d7a7..3b2611e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -22,13 +22,13 @@ import java.io.IOException;
 import org.apache.hadoop.fs.GlobPattern;
 import org.apache.hadoop.fs.Path;
 
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.ToString;
 
 import org.apache.gobblin.annotation.Alpha;
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -40,46 +40,35 @@ import org.apache.gobblin.util.PathUtils;
  * An implementation of {@link DatasetDescriptor} with FS-based storage.
  */
 @Alpha
-public class FSDatasetDescriptor implements DatasetDescriptor {
-  @Getter
-  private final String platform;
+@ToString (callSuper = true, exclude = {"rawConfig"})
+@EqualsAndHashCode (callSuper = true, exclude = {"rawConfig"})
+public class FSDatasetDescriptor extends BaseDatasetDescriptor implements 
DatasetDescriptor {
   @Getter
   private final String path;
   @Getter
-  private final FormatConfig formatConfig;
-  @Getter
-  private final FsDatasetPartitionConfig partitionConfig;
-  @Getter
-  private final boolean isRetentionApplied;
-  @Getter
   private final boolean isCompacted;
   @Getter
   private final boolean isCompactedAndDeduped;
   @Getter
-  private final String description;
+  private final FSDatasetPartitionConfig partitionConfig;
   @Getter
   private final Config rawConfig;
 
   private static final Config DEFAULT_FALLBACK =
       ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
-          .put(DatasetDescriptorConfigKeys.PATH_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
-          .put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false)
           .put(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false)
           .put(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false)
           .build());
 
   public FSDatasetDescriptor(Config config) throws IOException {
-    
Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY),
 "Dataset descriptor config must specify platform");
-    this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
-    this.path = PathUtils.getPathWithoutSchemeAndAuthority(new 
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
-        
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
-    this.formatConfig = new FormatConfig(config);
-    this.partitionConfig = new 
FsDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config, 
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
-    this.isRetentionApplied = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
+    super(config);
+    this.path = PathUtils
+        .getPathWithoutSchemeAndAuthority(new 
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
+            
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
     this.isCompacted = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
     this.isCompactedAndDeduped = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
-    this.description = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
-    this.rawConfig = 
config.withFallback(this.formatConfig.getRawConfig()).withFallback(this.partitionConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+    this.partitionConfig = new 
FSDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config, 
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
+    this.rawConfig = 
config.withFallback(getPartitionConfig().getRawConfig()).withFallback(DEFAULT_FALLBACK).withFallback(super.getRawConfig());
   }
 
   /**
@@ -90,7 +79,7 @@ public class FSDatasetDescriptor implements DatasetDescriptor 
{
    * @param otherPath a glob pattern that describes a set of paths.
    * @return true if the glob pattern described by the otherPath matches the 
path in this {@link DatasetDescriptor}.
    */
-  private boolean isPathContaining(String otherPath) {
+  protected boolean isPathContaining(String otherPath) {
     if (otherPath == null) {
       return false;
     }
@@ -109,71 +98,17 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
    */
   @Override
   public boolean contains(DatasetDescriptor o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof FSDatasetDescriptor)) {
+    if (!super.contains(o)) {
       return false;
     }
-    FSDatasetDescriptor other = (FSDatasetDescriptor) o;
 
-    if (this.getPlatform() == null || other.getPlatform() == null || 
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
-      return false;
-    }
-
-    if ((this.isRetentionApplied() != other.isRetentionApplied()) || 
(this.isCompacted() != other.isCompacted()) ||
-        (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
-      return false;
-    }
-
-    return getFormatConfig().contains(other.getFormatConfig()) && 
getPartitionConfig().contains(other.getPartitionConfig())
-        && isPathContaining(other.getPath());
-  }
-
-  /**
-   *
-   * @param o the other {@link FSDatasetDescriptor} to compare "this" {@link 
FSDatasetDescriptor} with.
-   * @return true iff  "this" dataset descriptor is compatible with the 
"other" and the "other" dataset descriptor is
-   * compatible with this dataset descriptor.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof FSDatasetDescriptor)) {
-      return false;
-    }
     FSDatasetDescriptor other = (FSDatasetDescriptor) o;
-    if (this.getPlatform() == null || other.getPlatform() == null || 
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
-      return false;
-    }
-    if ((this.isRetentionApplied() != other.isRetentionApplied()) || 
(this.isCompacted() != other.isCompacted()) ||
+
+    if ((this.isCompacted() != other.isCompacted()) ||
         (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
       return false;
     }
-    return this.getPath().equals(other.getPath()) && 
this.getPartitionConfig().equals(other.getPartitionConfig()) &&
-        this.getFormatConfig().equals(other.getFormatConfig());
-  }
-
-  @Override
-  public String toString() {
-     return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), 
this.getFormatConfig().toString(), this.getPartitionConfig().toString(),
-         String.valueOf(isRetentionApplied()), String.valueOf(isCompacted()), 
String.valueOf(isCompactedAndDeduped()))
-         + ")";
-  }
 
-  @Override
-  public int hashCode() {
-    int result = 17;
-    result = 31 * result + platform.toLowerCase().hashCode();
-    result = 31 * result + path.hashCode();
-    result = 31 * result + Boolean.hashCode(isRetentionApplied);
-    result = 31 * result + Boolean.hashCode(isCompacted);
-    result = 31 * result + Boolean.hashCode(isCompactedAndDeduped);
-    result = 31 * result + getFormatConfig().hashCode();
-    result = 31 * result + getPartitionConfig().hashCode();
-    return result;
+    return this.getPartitionConfig().contains(other.getPartitionConfig());
   }
-
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
similarity index 83%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
rename to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
index 86eec99..eb022ae 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
@@ -23,12 +23,13 @@ import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
 import com.google.common.base.Enums;
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -41,7 +42,9 @@ import org.apache.gobblin.util.ConfigUtils;
  * the regex pattern) is validated.
  */
 @Slf4j
-public class FsDatasetPartitionConfig {
+@ToString (exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"})
+public class FSDatasetPartitionConfig {
   @Getter
   private final String partitionType;
   @Getter
@@ -73,9 +76,9 @@ public class FsDatasetPartitionConfig {
           .put(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
           .build());
 
-  public FsDatasetPartitionConfig(Config config) throws IOException {
-    String partitionType = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
-    String partitionPattern = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+  public FSDatasetPartitionConfig(Config config) throws IOException {
+    String partitionType = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
+    String partitionPattern = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
     if (partitionType.equalsIgnoreCase(PartitionType.NONE.name())) {
       partitionPattern = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
     } else if(partitionType.equalsIgnoreCase(PartitionType.ANY.name())) {
@@ -125,41 +128,13 @@ public class FsDatasetPartitionConfig {
     }
   }
 
-  public boolean contains(FsDatasetPartitionConfig other) {
+  public boolean contains(FSDatasetPartitionConfig other) {
     if (other == null) {
       return false;
     }
-
     return 
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionType())
         || this.getPartitionType().equalsIgnoreCase(partitionType)))
         && 
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
         || 
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern())));
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (!(o instanceof FsDatasetPartitionConfig)) {
-      return false;
-    }
-    FsDatasetPartitionConfig other = (FsDatasetPartitionConfig) o;
-    return this.getPartitionType().equalsIgnoreCase(other.getPartitionType()) 
&&
-        
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern());
-  }
-
-  @Override
-  public String toString() {
-    return "(" + Joiner.on(",").join(this.getPartitionType(), 
this.getPartitionPattern()) + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 17;
-    result = 31 * result + this.getPartitionType().hashCode();
-    result = 31 * result + this.getPartitionPattern().toLowerCase().hashCode();
-    return result;
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
index 3485ec5..e02940b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -19,12 +19,13 @@ package org.apache.gobblin.service.modules.dataset;
 
 import java.io.IOException;
 
-import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.ToString;
 
 import org.apache.gobblin.annotation.Alpha;
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -41,6 +42,8 @@ import org.apache.gobblin.util.ConfigUtils;
  *  </ul>
  */
 @Alpha
+@ToString (exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"})
 public class FormatConfig {
   @Getter
   private final String format;
@@ -58,8 +61,8 @@ public class FormatConfig {
           .build());
 
   public FormatConfig(Config config) throws IOException {
-    this.format = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.FORMAT_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
-    this.codecType = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.CODEC_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    this.format = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.FORMAT_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
+    this.codecType = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.CODEC_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
     this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, 
DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
         .empty()));
     this.rawConfig = 
config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
@@ -84,32 +87,4 @@ public class FormatConfig {
   private boolean containsEncryptionConfig(EncryptionConfig 
otherEncryptionConfig) {
     return this.getEncryptionConfig().contains(otherEncryptionConfig);
   }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-
-    if (!(o instanceof FormatConfig)) {
-      return false;
-    }
-    FormatConfig other = (FormatConfig) o;
-    return this.getFormat().equalsIgnoreCase(other.getFormat()) && 
this.getCodecType().equalsIgnoreCase(other.getCodecType())
-        && this.getEncryptionConfig().equals(other.getEncryptionConfig());
-  }
-
-  @Override
-  public String toString() {
-    return "(" + Joiner.on(",").join(this.getFormat(), this.getCodecType(), 
this.getEncryptionConfig().toString()) + ")";
-  }
-
-  @Override
-  public int hashCode() {
-    int result = 17;
-    result = 31 * result + codecType.toLowerCase().hashCode();
-    result = 31 * result + format.toLowerCase().hashCode();
-    result = 31 * result + encryptionConfig.hashCode();
-    return result;
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
new file mode 100644
index 0000000..2811992
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Enums;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+
+@Slf4j
+@ToString (exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
+public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements 
DatasetDescriptor {
+  private static final String SEPARATION_CHAR = ";";
+
+  private final String databaseName;
+  private final String tableName;
+
+  @Getter
+  private final String path;
+  @Getter
+  private final Config rawConfig;
+
+  public enum  Platform {
+    SQLSERVER("sqlserver"),
+    MYSQL("mysql"),
+    ORACLE("oracle"),
+    POSTGRES("postgres"),
+    TERADARA("teradata");
+
+    private final String platform;
+
+    Platform(final String platform) {
+      this.platform = platform;
+    }
+
+    @Override
+    public String toString() {
+      return this.platform;
+    }
+  }
+
+  public SqlDatasetDescriptor(Config config) throws IOException {
+    super(config);
+    if (!isPlatformValid()) {
+      throw new IOException("Invalid platform specified for 
SqlDatasetDescriptor: " + getPlatform());
+    }
+    this.databaseName = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.DATABASE_KEY, ".*");
+    this.tableName = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.TABLE_KEY, ".*");
+    this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
+    this.rawConfig = config.withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef(this.path)).withFallback(super.getRawConfig());
+  }
+
+  private String fullyQualifiedTableName(String databaseName, String 
tableName) {
+    return Joiner.on(SEPARATION_CHAR).join(databaseName, tableName);
+  }
+
+  private boolean isPlatformValid() {
+    return Enums.getIfPresent(Platform.class, 
getPlatform().toUpperCase()).isPresent();
+  }
+  /**
+   * Check if the dbName and tableName specified in {@param otherPath} are 
accepted by the set of dbName.tableName
+   * combinations defined by the current {@link SqlDatasetDescriptor}. For 
example, let:
+   * this.path = "test_.*;test_table_.*". Then:
+   * isPathContaining("test_db1;test_table_1") = true
+   * isPathContaining("testdb1;test_table_2") = false
+   *
+   * NOTE: otherPath cannot be a globPattern. So:
+   * isPathContaining("test_db.*;test_table_*") = false
+   *
+   * @param otherPath which should be in the format of dbName.tableName
+   */
+  @Override
+  protected boolean isPathContaining(String otherPath) {
+    if (otherPath == null) {
+      return false;
+    }
+
+    if (PathUtils.GLOB_TOKENS.matcher(otherPath).find()) {
+      return false;
+    }
+
+    //Extract the dbName and tableName from otherPath
+    List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
+    if (parts.size() != 2) {
+      return false;
+    }
+
+    String otherDbName = parts.get(0);
+    String otherTableName = parts.get(1);
+
+    return Pattern.compile(this.databaseName).matcher(otherDbName).matches() 
&& Pattern.compile(this.tableName).matcher(otherTableName).matches();
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index 4fb9711..23bf83b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -19,21 +19,22 @@ package org.apache.gobblin.service.modules.flowgraph;
 
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.util.ConfigUtils;
 
 import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.util.ConfigUtils;
+
 
 /**
  * An implementation of {@link DataNode}.
  */
 @Alpha
 @Slf4j
+@EqualsAndHashCode (exclude = {"rawConfig", "active"})
 public class BaseDataNode implements DataNode {
   @Getter
   private String id;
@@ -55,27 +56,4 @@ public class BaseDataNode implements DataNode {
       throw new DataNodeCreationException(e);
     }
   }
-
-  /**
-   * The comparison between two nodes should involve the configuration.
-   * Node name is the identifier for the node.
-   * */
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    BaseDataNode that = (BaseDataNode) o;
-
-    return id.equals(that.getId());
-  }
-
-  @Override
-  public int hashCode() {
-    return this.id.hashCode();
-  }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index a144598..905ef81 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -31,12 +31,14 @@ public class DatasetDescriptorConfigKeys {
   public static final String CLASS_KEY = "class";
   public static final String PLATFORM_KEY = "platform";
   public static final String PATH_KEY = "path";
+  public static final String DATABASE_KEY = "databaseName";
+  public static final String TABLE_KEY = "tableName";
   public static final String FORMAT_KEY = "format";
   public static final String CODEC_KEY = "codec";
   public static final String DESCRIPTION_KEY = "description";
   public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied";
   public static final String IS_COMPACTED_KEY = "isCompacted";
-  public static final String IS_COMPACTED_AND_DEDUPED_KEY = 
"isCompatedAndDeduped";
+  public static final String IS_COMPACTED_AND_DEDUPED_KEY = 
"isCompactedAndDeduped";
 
   //Dataset encryption related keys
   public static final String ENCYPTION_PREFIX = "encrypt";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SqlDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SqlDataNode.java
new file mode 100644
index 0000000..4d90172
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SqlDataNode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+@EqualsAndHashCode (callSuper = true)
+public class SqlDataNode extends BaseDataNode {
+  public static final String SQL_HOSTNAME = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sql.hostname";
+  public static final String SQL_PORT = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sql.port";
+  public static final String SQL_DRIVER = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + ".sql.driver";
+
+  @Getter
+  private String hostName;
+  @Getter
+  private Integer port;
+  @Getter
+  private String jdbcDriver;
+
+  public SqlDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+    try {
+      this.hostName = ConfigUtils.getString(nodeProps, SQL_HOSTNAME, "");
+      this.port = ConfigUtils.getInt(nodeProps, SQL_PORT, 0);
+      this.jdbcDriver = ConfigUtils.getString(nodeProps, SQL_DRIVER, "");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(hostName), 
SQL_HOSTNAME + " cannot be null or empty.");
+      Preconditions.checkArgument(port != 0, SQL_PORT + " cannot be empty.");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(jdbcDriver), 
SQL_DRIVER + " cannot be null or empty.");
+    } catch (Exception e) {
+      throw new DataNodeCreationException(e);
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
index 72f1a66..9830f47 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
@@ -20,24 +20,25 @@ package 
org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
 import java.io.IOException;
 import java.net.URI;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 
+import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
 
 /**
  * An abstract {@link FileSystemDataNode} implementation. In addition to the 
required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
  * must have a FS URI specified. Example implementations of {@link 
FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
  */
 @Alpha
+@EqualsAndHashCode (callSuper = true)
 public abstract class FileSystemDataNode extends BaseDataNode {
   public static final String FS_URI_KEY = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri";
 
@@ -63,25 +64,4 @@ public abstract class FileSystemDataNode extends 
BaseDataNode {
   }
 
   public abstract boolean isUriValid(URI fsUri);
-  /**
-   * Two HDFS DataNodes are the same if they have the same id and the same 
fsUri.
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    FileSystemDataNode that = (FileSystemDataNode) o;
-
-    return this.getId().equals(that.getId()) && 
this.fsUri.equals(that.getFsUri());
-  }
-
-  @Override
-  public int hashCode() {
-    return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
-  }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
new file mode 100644
index 0000000..75a142a
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+import static org.testng.Assert.*;
+
+
+public class SqlDatasetDescriptorTest {
+
+  @Test
+  public void testContains() throws IOException {
+    Config config1 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("sqlserver"))
+        .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, 
ConfigValueFactory.fromAnyRef("testDb_Db1"))
+        .withValue(DatasetDescriptorConfigKeys.TABLE_KEY, 
ConfigValueFactory.fromAnyRef("testTable_Table1"));
+
+    SqlDatasetDescriptor descriptor1 = new SqlDatasetDescriptor(config1);
+
+    Config config2 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("sqlserver"));
+    SqlDatasetDescriptor descriptor2 = new SqlDatasetDescriptor(config2);
+    Assert.assertTrue(descriptor2.contains(descriptor1));
+
+    Config config3 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("sqlserver"))
+        .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, 
ConfigValueFactory.fromAnyRef("testDb_.*"))
+        .withValue(DatasetDescriptorConfigKeys.TABLE_KEY, 
ConfigValueFactory.fromAnyRef("testTable_.*"));
+
+    SqlDatasetDescriptor descriptor3 = new SqlDatasetDescriptor(config3);
+    Assert.assertTrue(descriptor3.contains(descriptor1));
+
+    Config config4 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("sqlserver"))
+        .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, 
ConfigValueFactory.fromAnyRef("Db_.*"))
+        .withValue(DatasetDescriptorConfigKeys.TABLE_KEY, 
ConfigValueFactory.fromAnyRef("Table_.*"));
+    SqlDatasetDescriptor descriptor4 = new SqlDatasetDescriptor(config4);
+    Assert.assertFalse(descriptor4.contains(descriptor1));
+  }
+}
\ No newline at end of file

Reply via email to