This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 58eca28  [GOBBLIN-675] Enhance FSDatasetDescriptor definition to 
include partit…
58eca28 is described below

commit 58eca2810eff3829821309af29744fa08c92bad0
Author: suvasude <[email protected]>
AuthorDate: Tue Feb 5 10:37:42 2019 -0800

    [GOBBLIN-675] Enhance FSDatasetDescriptor definition to include partit…
    
    Closes #2544 from sv2000/fsDescriptor
---
 .../service/modules/dataset/EncryptionConfig.java  |  89 +++++++++--
 .../modules/dataset/FSDatasetDescriptor.java       |  37 ++++-
 .../service/modules/dataset/FormatConfig.java      |   4 +-
 .../modules/dataset/FsDatasetPartitionConfig.java  | 165 +++++++++++++++++++++
 .../flowgraph/DatasetDescriptorConfigKeys.java     |   9 ++
 .../modules/dataset/FSDatasetDescriptorTest.java   | 163 ++++++++++++++++++++
 gobblin-service/src/test/resources/flow/flow1.conf |   3 +-
 .../hdfsConvertToJsonAndEncrypt/flow.conf          |   3 +-
 8 files changed, 452 insertions(+), 21 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
index 52a1a10..08b96ac 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -17,52 +17,110 @@
 
 package org.apache.gobblin.service.modules.dataset;
 
+import java.io.IOException;
+
+import com.google.common.base.Enums;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.core.GitMonitoringService;
 import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
-
+@Slf4j
 public class EncryptionConfig {
   @Getter
   private final String encryptionAlgorithm;
   @Getter
+  private final String encryptionLevel;
+  @Getter
+  private final String encryptedFields;
+  @Getter
   private final String keystoreType;
   @Getter
   private final String keystoreEncoding;
   @Getter
   private final Config rawConfig;
 
+  public enum  EncryptionLevel {
+    FILE("file"),
+    ROW("row"),
+    FIELD("field"),
+    NONE(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE),
+    ANY(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
+    private final String level;
+
+    EncryptionLevel(final String level) {
+      this.level = level;
+    }
+
+    @Override
+    public String toString() {
+      return this.level;
+    }
+
+  }
+
   private static final Config DEFAULT_FALLBACK =
       ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
           .put(DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
           .put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
           .put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
           .build());
 
-  public EncryptionConfig(Config encryptionConfig) {
+  public EncryptionConfig(Config encryptionConfig) throws IOException {
     this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
         DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
     if 
(this.encryptionAlgorithm.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE))
 {
       this.keystoreType = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
       this.keystoreEncoding = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+      this.encryptionLevel = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+      this.encryptedFields = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
     } else {
       this.keystoreType = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
           DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
       this.keystoreEncoding = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
           DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+      this.encryptionLevel = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
+          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+      this.encryptedFields = ConfigUtils.getString(encryptionConfig, 
DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS,
+          DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+      validate(this.encryptionLevel, this.encryptedFields);
     }
     this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK);
   }
 
+  private void validate(String encryptionLevel, String encryptedFields) throws 
IOException {
+    if (!Enums.getIfPresent(EncryptionLevel.class, 
encryptionLevel.toUpperCase()).isPresent()) {
+      throw new IOException("Invalid encryption level " + encryptionLevel);
+    }
+    switch (EncryptionLevel.valueOf(encryptionLevel.toUpperCase())) {
+      case FIELD:
+        if 
((encryptedFields.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))
 ||
+            
(encryptedFields.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE)))
 {
+          log.error("Invalid input for encryptedFields {}", encryptedFields);
+          throw new IOException("Invalid encryptedFields");
+        }
+        break;
+      case NONE:
+        if 
(!encryptedFields.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE))
 {
+          log.error("Invalid input for encryptedFields {}", encryptedFields);
+          throw new IOException("Invalid encryptedFields");
+        }
+        break;
+      default:
+        break;
+    }
+    return;
+  }
+
   public boolean contains(EncryptionConfig other) {
     if (other == null) {
       return false;
@@ -71,13 +129,19 @@ public class EncryptionConfig {
     String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
     String otherKeystoreType = other.getKeystoreType();
     String otherKeystoreEncoding = other.getKeystoreEncoding();
+    String otherEncryptionLevel = other.getEncryptionLevel();
+    String otherEncryptedFields = other.getEncryptedFields();
 
-    return 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getEncryptionAlgorithm())
+    return 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionAlgorithm())
         || this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
-        && 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreType())
+        && 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreType())
         || this.keystoreType.equalsIgnoreCase(otherKeystoreType))
-        && 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreEncoding())
-        || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding));
+        && 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreEncoding())
+        || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding))
+        && 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionLevel())
+        || this.encryptionLevel.equalsIgnoreCase(otherEncryptionLevel))
+        && 
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptedFields())
+        || this.encryptedFields.equalsIgnoreCase(otherEncryptedFields));
   }
 
   @Override
@@ -91,12 +155,13 @@ public class EncryptionConfig {
     }
     EncryptionConfig other = (EncryptionConfig) o;
     return 
this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm()) 
&& this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
-        && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType());
+        && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType()) && 
this.getEncryptionLevel().equals(other.getEncryptionLevel())
+        && 
this.getEncryptedFields().equalsIgnoreCase(other.getEncryptedFields());
   }
 
   @Override
   public String toString() {
-    return "(" + Joiner.on(",").join(this.encryptionAlgorithm, 
this.keystoreType, this.keystoreEncoding) + ")";
+    return "(" + Joiner.on(",").join(this.encryptionAlgorithm, 
this.encryptionLevel, this.encryptedFields, this.keystoreType, 
this.keystoreEncoding) + ")";
   }
 
   @Override
@@ -105,6 +170,8 @@ public class EncryptionConfig {
     result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
     result = 31 * result + keystoreType.toLowerCase().hashCode();
     result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
+    result = 31 * result + encryptionLevel.toLowerCase().hashCode();
+    result = 31 * result + encryptedFields.toLowerCase().hashCode();
     return result;
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index e9f4e31..8b5d7a7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.modules.dataset;
 
+import java.io.IOException;
+
 import org.apache.hadoop.fs.GlobPattern;
 import org.apache.hadoop.fs.Path;
 
@@ -46,8 +48,14 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
   @Getter
   private final FormatConfig formatConfig;
   @Getter
+  private final FsDatasetPartitionConfig partitionConfig;
+  @Getter
   private final boolean isRetentionApplied;
   @Getter
+  private final boolean isCompacted;
+  @Getter
+  private final boolean isCompactedAndDeduped;
+  @Getter
   private final String description;
   @Getter
   private final Config rawConfig;
@@ -56,17 +64,22 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
       ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
           .put(DatasetDescriptorConfigKeys.PATH_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
           .put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false)
+          .put(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false)
+          .put(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false)
           .build());
 
-  public FSDatasetDescriptor(Config config) {
+  public FSDatasetDescriptor(Config config) throws IOException {
     
Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY),
 "Dataset descriptor config must specify platform");
     this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
     this.path = PathUtils.getPathWithoutSchemeAndAuthority(new 
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
         
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
     this.formatConfig = new FormatConfig(config);
+    this.partitionConfig = new 
FsDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config, 
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
     this.isRetentionApplied = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
+    this.isCompacted = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
+    this.isCompactedAndDeduped = ConfigUtils.getBoolean(config, 
DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
     this.description = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
-    this.rawConfig = 
config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+    this.rawConfig = 
config.withFallback(this.formatConfig.getRawConfig()).withFallback(this.partitionConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
   }
 
   /**
@@ -108,11 +121,13 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
       return false;
     }
 
-    if (this.isRetentionApplied() != other.isRetentionApplied()) {
+    if ((this.isRetentionApplied() != other.isRetentionApplied()) || 
(this.isCompacted() != other.isCompacted()) ||
+        (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
       return false;
     }
 
-    return getFormatConfig().contains(other.getFormatConfig()) && 
isPathContaining(other.getPath());
+    return getFormatConfig().contains(other.getFormatConfig()) && 
getPartitionConfig().contains(other.getPartitionConfig())
+        && isPathContaining(other.getPath());
   }
 
   /**
@@ -133,15 +148,19 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
     if (this.getPlatform() == null || other.getPlatform() == null || 
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
       return false;
     }
-    if (this.isRetentionApplied() != other.isRetentionApplied()) {
+    if ((this.isRetentionApplied() != other.isRetentionApplied()) || 
(this.isCompacted() != other.isCompacted()) ||
+        (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
       return false;
     }
-    return this.getPath().equals(other.getPath()) && 
this.getFormatConfig().equals(other.getFormatConfig());
+    return this.getPath().equals(other.getPath()) && 
this.getPartitionConfig().equals(other.getPartitionConfig()) &&
+        this.getFormatConfig().equals(other.getFormatConfig());
   }
 
   @Override
   public String toString() {
-     return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), 
this.getFormatConfig().toString()) + ")";
+     return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(), 
this.getFormatConfig().toString(), this.getPartitionConfig().toString(),
+         String.valueOf(isRetentionApplied()), String.valueOf(isCompacted()), 
String.valueOf(isCompactedAndDeduped()))
+         + ")";
   }
 
   @Override
@@ -149,7 +168,11 @@ public class FSDatasetDescriptor implements 
DatasetDescriptor {
     int result = 17;
     result = 31 * result + platform.toLowerCase().hashCode();
     result = 31 * result + path.hashCode();
+    result = 31 * result + Boolean.hashCode(isRetentionApplied);
+    result = 31 * result + Boolean.hashCode(isCompacted);
+    result = 31 * result + Boolean.hashCode(isCompactedAndDeduped);
     result = 31 * result + getFormatConfig().hashCode();
+    result = 31 * result + getPartitionConfig().hashCode();
     return result;
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
index 4b45a52..3485ec5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.service.modules.dataset;
 
+import java.io.IOException;
+
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
@@ -55,7 +57,7 @@ public class FormatConfig {
           .put(DatasetDescriptorConfigKeys.CODEC_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
           .build());
 
-  public FormatConfig(Config config) {
+  public FormatConfig(Config config) throws IOException {
     this.format = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.FORMAT_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
     this.codecType = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.CODEC_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
     this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config, 
DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
new file mode 100644
index 0000000..86eec99
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import com.google.common.base.Enums;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A class that is used to describe partition configuration of a 
filesystem-based dataset. Common partitioning
+ * types include "datetime" and "regex". For each partition type, the 
corresponding partition pattern (e.g. date pattern or
+ * the regex pattern) is validated.
+ */
+@Slf4j
+public class FsDatasetPartitionConfig {
+  @Getter
+  private final String partitionType;
+  @Getter
+  private final String partitionPattern;
+  @Getter
+  private final Config rawConfig;
+
+  public enum PartitionType {
+    DATETIME("datetime"),
+    REGEX("regex"),
+    NONE(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE),
+    ANY(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
+    private final String type;
+
+    PartitionType(final String type) {
+      this.type = type;
+    }
+
+    @Override
+    public String toString() {
+      return this.type;
+    }
+  }
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .put(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+          .build());
+
+  public FsDatasetPartitionConfig(Config config) throws IOException {
+    String partitionType = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    String partitionPattern = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+    if (partitionType.equalsIgnoreCase(PartitionType.NONE.name())) {
+      partitionPattern = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+    } else if(partitionType.equalsIgnoreCase(PartitionType.ANY.name())) {
+      partitionPattern = 
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY;
+    }
+    validatePartitionConfig(partitionType, partitionPattern);
+    this.partitionType = partitionType;
+    this.partitionPattern = partitionPattern;
+    this.rawConfig = config.withFallback(DEFAULT_FALLBACK);
+  }
+
+  private void validatePartitionConfig(String partitionType, String 
partitionPattern)
+      throws IOException {
+    if (!Enums.getIfPresent(PartitionType.class, 
partitionType.toUpperCase()).isPresent()) {
+      log.error("Invalid partition type {}", partitionType);
+      throw new IOException("Invalid partition type");
+    }
+    switch (PartitionType.valueOf(partitionType.toUpperCase())) {
+      case DATETIME:
+        try {
+          new SimpleDateFormat(partitionPattern);
+        } catch (Exception e) {
+          log.error("Invalid datetime partition pattern {}", partitionPattern);
+          throw new IOException(e);
+        }
+        break;
+      case REGEX:
+        try {
+          Pattern.compile(partitionPattern);
+        } catch (PatternSyntaxException e) {
+          log.error("Invalid regex partition pattern {}", partitionPattern);
+          throw new IOException(e);
+        }
+        break;
+      case NONE:
+        if 
(!partitionPattern.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE))
 {
+          log.error("Partition pattern {} incompatible with partition type 
{}", partitionPattern, partitionType);
+          throw new IOException("Incompatible partition pattern/type");
+        }
+        break;
+      case ANY:
+        if 
(!partitionPattern.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))
 {
+          log.error("Partition pattern {} incompatible with partition type 
{}", partitionPattern, partitionType);
+          throw new IOException("Incompatible partition pattern/type");
+        }
+        break;
+    }
+  }
+
+  public boolean contains(FsDatasetPartitionConfig other) {
+    if (other == null) {
+      return false;
+    }
+
+    return 
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionType())
+        || this.getPartitionType().equalsIgnoreCase(partitionType)))
+        && 
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
+        || 
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern())));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof FsDatasetPartitionConfig)) {
+      return false;
+    }
+    FsDatasetPartitionConfig other = (FsDatasetPartitionConfig) o;
+    return this.getPartitionType().equalsIgnoreCase(other.getPartitionType()) 
&&
+        
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern());
+  }
+
+  @Override
+  public String toString() {
+    return "(" + Joiner.on(",").join(this.getPartitionType(), 
this.getPartitionPattern()) + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 17;
+    result = 31 * result + this.getPartitionType().hashCode();
+    result = 31 * result + this.getPartitionPattern().toLowerCase().hashCode();
+    return result;
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index 6470a1d..a144598 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -35,12 +35,21 @@ public class DatasetDescriptorConfigKeys {
   public static final String CODEC_KEY = "codec";
   public static final String DESCRIPTION_KEY = "description";
   public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied";
+  public static final String IS_COMPACTED_KEY = "isCompacted";
+  public static final String IS_COMPACTED_AND_DEDUPED_KEY = 
"isCompatedAndDeduped";
 
   //Dataset encryption related keys
   public static final String ENCYPTION_PREFIX = "encrypt";
   public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
   public static final String ENCRYPTION_KEYSTORE_TYPE_KEY = "keystore_type";
   public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY = 
"keystore_encoding";
+  public static final String ENCRYPTION_LEVEL_KEY = "level";
+  public static final String ENCRYPTED_FIELDS = "encryptedFields";
+
+  //Dataset partition related keys
+  public static final String PARTITION_PREFIX = "partition";
+  public static final String PARTITION_TYPE_KEY = "type";
+  public static final String PARTITION_PATTERN_KEY = "pattern";
 
   public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any";
   public static final String DATASET_DESCRIPTOR_CONFIG_NONE = "none";
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
new file mode 100644
index 0000000..d065cba
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+
+public class FSDatasetDescriptorTest {
+  @Test
+  public void testContains() throws IOException {
+    //Ensure descriptor2's path is matched by the regular expression in 
descriptor1's path
+    Config config1 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"));
+    FSDatasetDescriptor descriptor1 = new FSDatasetDescriptor(config1);
+
+    Config config2 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/d"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"))
+        .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY, 
ConfigValueFactory.fromAnyRef("avro"))
+        .withValue(DatasetDescriptorConfigKeys.CODEC_KEY, 
ConfigValueFactory.fromAnyRef("gzip"));
+
+    FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
+    Assert.assertTrue(descriptor1.contains(descriptor2));
+
+    //Add encryption config
+    Config encConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
 ConfigValueFactory.fromAnyRef("file"))
+        .withValue(DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY, 
ConfigValueFactory.fromAnyRef("aes_rotating"))
+        .atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+    Config config3 = config2.withFallback(encConfig);
+    FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
+    Assert.assertTrue(descriptor2.contains(descriptor3));
+    Assert.assertTrue(descriptor1.contains(descriptor3));
+
+    //Add partition config
+    Config partitionConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
ConfigValueFactory.fromAnyRef("datetime"))
+        .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
ConfigValueFactory.fromAnyRef("yyyy/MM/dd"))
+        .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+    Config config4 = config3.withFallback(partitionConfig);
+    FSDatasetDescriptor descriptor4 = new FSDatasetDescriptor(config4);
+    Assert.assertTrue(descriptor3.contains(descriptor4));
+    Assert.assertTrue(descriptor2.contains(descriptor4));
+    Assert.assertTrue(descriptor1.contains(descriptor4));
+
+    //Add compaction/retention config
+    Config miscConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY,
 ConfigValueFactory.fromAnyRef("true"))
+        .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, 
ConfigValueFactory.fromAnyRef("true"));
+    Config config5 = config4.withFallback(miscConfig);
+    FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
+    Assert.assertFalse(descriptor4.contains(descriptor5));
+    Assert.assertFalse(descriptor3.contains(descriptor5));
+    Assert.assertFalse(descriptor2.contains(descriptor5));
+    Assert.assertFalse(descriptor1.contains(descriptor5));
+  }
+
+  @Test
+  public void testEquals() throws IOException {
+    Config config1 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"));
+    FSDatasetDescriptor descriptor1 = new FSDatasetDescriptor(config1);
+
+    Config config2 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"));
+    FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
+
+    Assert.assertTrue(descriptor1.equals(descriptor2));
+    Assert.assertTrue(descriptor2.equals(descriptor1));
+    Assert.assertEquals(descriptor1.hashCode(), descriptor2.hashCode());
+
+    Config config3 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"))
+        .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY, 
ConfigValueFactory.fromAnyRef("any"))
+        .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, 
ConfigValueFactory.fromAnyRef("false"));
+    FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
+    Assert.assertTrue(descriptor1.equals(descriptor3));
+    Assert.assertEquals(descriptor1.hashCode(), descriptor3.hashCode());
+
+    //Ensure switching booleans between 2 boolean member variables does not 
produce the same hashcode.
+    Config config4 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"))
+        .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY, 
ConfigValueFactory.fromAnyRef("any"))
+        .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, 
ConfigValueFactory.fromAnyRef("false"))
+        .withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, 
ConfigValueFactory.fromAnyRef("true"));
+    FSDatasetDescriptor descriptor4 = new FSDatasetDescriptor(config4);
+
+    Config config5 = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"))
+        .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY, 
ConfigValueFactory.fromAnyRef("any"))
+        .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, 
ConfigValueFactory.fromAnyRef("true"))
+        .withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, 
ConfigValueFactory.fromAnyRef("false"));
+    FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
+
+    Assert.assertFalse(descriptor4.equals(descriptor5));
+    Assert.assertNotEquals(descriptor4.hashCode(), descriptor5.hashCode());
+  }
+
+  @Test
+  public void testInitFails() {
+    //Datetime partition type, invalid datetime pattern
+    Config config = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY, 
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+        .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hdfs"));
+    Config partitionConfig = ConfigFactory.empty()
+        .withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
ConfigValueFactory.fromAnyRef("datetime"))
+        .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
ConfigValueFactory.fromAnyRef("BBBB/MM/dd"))
+        .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+    Config config1 = config.withFallback(partitionConfig);
+    Assert.assertThrows(IOException.class, () -> new 
FSDatasetDescriptor(config1));
+
+    //Regex partition type, invalid regular expression
+    partitionConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
ConfigValueFactory.fromAnyRef("regex"))
+        .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
ConfigValueFactory.fromAnyRef("["))
+        .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+    Config config2 = config.withFallback(partitionConfig);
+    Assert.assertThrows(IOException.class, () -> new 
FSDatasetDescriptor(config2));
+
+    //Partition Config with invalid partition type
+    partitionConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY, 
ConfigValueFactory.fromAnyRef("invalidType"))
+        .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY, 
ConfigValueFactory.fromAnyRef("aaaa"))
+        .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+    Config config3 = config.withFallback(partitionConfig);
+    Assert.assertThrows(IOException.class, () -> new 
FSDatasetDescriptor(config3));
+
+    //Encryption config with invalid encryption level
+    Config encryptionConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
 ConfigValueFactory.fromAnyRef("aaaa"))
+        .atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+    Config config4 = config.withFallback(encryptionConfig);
+    Assert.assertThrows(IOException.class, () -> new 
FSDatasetDescriptor(config4));
+
+    encryptionConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
 ConfigValueFactory.fromAnyRef("field"))
+        .atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+    Config config5 = config.withFallback(encryptionConfig);
+    Assert.assertThrows(IOException.class, () -> new 
FSDatasetDescriptor(config5));
+
+    encryptionConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
 ConfigValueFactory.fromAnyRef("none"))
+        .withValue(DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS, 
ConfigValueFactory.fromAnyRef("field1")).atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+    Config config6 = config.withFallback(encryptionConfig);
+    Assert.assertThrows(IOException.class, () -> new 
FSDatasetDescriptor(config6));
+
+
+
+  }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/resources/flow/flow1.conf 
b/gobblin-service/src/test/resources/flow/flow1.conf
index f818df6..cad2aa8 100644
--- a/gobblin-service/src/test/resources/flow/flow1.conf
+++ b/gobblin-service/src/test/resources/flow/flow1.conf
@@ -21,4 +21,5 @@ gobblin.flow.output.dataset.descriptor.format=json
 gobblin.flow.output.dataset.descriptor.codec=gzip
 gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
 gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
-gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
+gobblin.flow.output.dataset.descriptor.encrypt.level=file
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
index 2ae6fb6..a9be4ff 100644
--- 
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
+++ 
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
@@ -16,4 +16,5 @@ gobblin.flow.edge.output.dataset.descriptor.0.format=json
 gobblin.flow.edge.output.dataset.descriptor.0.codec=gzip
 gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=aes_rotating
 gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_type=json
-gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=base64
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=base64
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.level=file
\ No newline at end of file

Reply via email to