This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 58eca28 [GOBBLIN-675] Enhance FSDatasetDescriptor definition to
include partit…
58eca28 is described below
commit 58eca2810eff3829821309af29744fa08c92bad0
Author: suvasude <[email protected]>
AuthorDate: Tue Feb 5 10:37:42 2019 -0800
[GOBBLIN-675] Enhance FSDatasetDescriptor definition to include partit…
Closes #2544 from sv2000/fsDescriptor
---
.../service/modules/dataset/EncryptionConfig.java | 89 +++++++++--
.../modules/dataset/FSDatasetDescriptor.java | 37 ++++-
.../service/modules/dataset/FormatConfig.java | 4 +-
.../modules/dataset/FsDatasetPartitionConfig.java | 165 +++++++++++++++++++++
.../flowgraph/DatasetDescriptorConfigKeys.java | 9 ++
.../modules/dataset/FSDatasetDescriptorTest.java | 163 ++++++++++++++++++++
gobblin-service/src/test/resources/flow/flow1.conf | 3 +-
.../hdfsConvertToJsonAndEncrypt/flow.conf | 3 +-
8 files changed, 452 insertions(+), 21 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
index 52a1a10..08b96ac 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -17,52 +17,110 @@
package org.apache.gobblin.service.modules.dataset;
+import java.io.IOException;
+
+import com.google.common.base.Enums;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.core.GitMonitoringService;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
-
+@Slf4j
public class EncryptionConfig {
@Getter
private final String encryptionAlgorithm;
@Getter
+ private final String encryptionLevel;
+ @Getter
+ private final String encryptedFields;
+ @Getter
private final String keystoreType;
@Getter
private final String keystoreEncoding;
@Getter
private final Config rawConfig;
+ public enum EncryptionLevel {
+ FILE("file"),
+ ROW("row"),
+ FIELD("field"),
+ NONE(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE),
+ ANY(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
+ private final String level;
+
+ EncryptionLevel(final String level) {
+ this.level = level;
+ }
+
+ @Override
+ public String toString() {
+ return this.level;
+ }
+
+ }
+
private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
.put(DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
.put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
.put(DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+ .put(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+ .put(DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
.build());
- public EncryptionConfig(Config encryptionConfig) {
+ public EncryptionConfig(Config encryptionConfig) throws IOException {
this.encryptionAlgorithm = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
if
(this.encryptionAlgorithm.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE))
{
this.keystoreType =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
this.keystoreEncoding =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+ this.encryptionLevel =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+ this.encryptedFields =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
} else {
this.keystoreType = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
this.keystoreEncoding = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.encryptionLevel = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.encryptedFields = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS,
+ DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ validate(this.encryptionLevel, this.encryptedFields);
}
this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK);
}
+ private void validate(String encryptionLevel, String encryptedFields) throws
IOException {
+ if (!Enums.getIfPresent(EncryptionLevel.class,
encryptionLevel.toUpperCase()).isPresent()) {
+ throw new IOException("Invalid encryption level " + encryptionLevel);
+ }
+ switch (EncryptionLevel.valueOf(encryptionLevel.toUpperCase())) {
+ case FIELD:
+ if
((encryptedFields.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))
||
+
(encryptedFields.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE)))
{
+ log.error("Invalid input for encryptedFields {}", encryptedFields);
+ throw new IOException("Invalid encryptedFields");
+ }
+ break;
+ case NONE:
+ if
(!encryptedFields.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE))
{
+ log.error("Invalid input for encryptedFields {}", encryptedFields);
+ throw new IOException("Invalid encryptedFields");
+ }
+ break;
+ default:
+ break;
+ }
+ return;
+ }
+
public boolean contains(EncryptionConfig other) {
if (other == null) {
return false;
@@ -71,13 +129,19 @@ public class EncryptionConfig {
String otherEncryptionAlgorithm = other.getEncryptionAlgorithm();
String otherKeystoreType = other.getKeystoreType();
String otherKeystoreEncoding = other.getKeystoreEncoding();
+ String otherEncryptionLevel = other.getEncryptionLevel();
+ String otherEncryptedFields = other.getEncryptedFields();
- return
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getEncryptionAlgorithm())
+ return
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionAlgorithm())
|| this.encryptionAlgorithm.equalsIgnoreCase(otherEncryptionAlgorithm))
- &&
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreType())
+ &&
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreType())
|| this.keystoreType.equalsIgnoreCase(otherKeystoreType))
- &&
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equals(this.getKeystoreEncoding())
- || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding));
+ &&
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getKeystoreEncoding())
+ || this.keystoreEncoding.equalsIgnoreCase(otherKeystoreEncoding))
+ &&
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptionLevel())
+ || this.encryptionLevel.equalsIgnoreCase(otherEncryptionLevel))
+ &&
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptedFields())
+ || this.encryptedFields.equalsIgnoreCase(otherEncryptedFields));
}
@Override
@@ -91,12 +155,13 @@ public class EncryptionConfig {
}
EncryptionConfig other = (EncryptionConfig) o;
return
this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm())
&& this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
- && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType());
+ && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType()) &&
this.getEncryptionLevel().equals(other.getEncryptionLevel())
+ &&
this.getEncryptedFields().equalsIgnoreCase(other.getEncryptedFields());
}
@Override
public String toString() {
- return "(" + Joiner.on(",").join(this.encryptionAlgorithm,
this.keystoreType, this.keystoreEncoding) + ")";
+ return "(" + Joiner.on(",").join(this.encryptionAlgorithm,
this.encryptionLevel, this.encryptedFields, this.keystoreType,
this.keystoreEncoding) + ")";
}
@Override
@@ -105,6 +170,8 @@ public class EncryptionConfig {
result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
result = 31 * result + keystoreType.toLowerCase().hashCode();
result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
+ result = 31 * result + encryptionLevel.toLowerCase().hashCode();
+ result = 31 * result + encryptedFields.toLowerCase().hashCode();
return result;
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index e9f4e31..8b5d7a7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service.modules.dataset;
+import java.io.IOException;
+
import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.fs.Path;
@@ -46,8 +48,14 @@ public class FSDatasetDescriptor implements
DatasetDescriptor {
@Getter
private final FormatConfig formatConfig;
@Getter
+ private final FsDatasetPartitionConfig partitionConfig;
+ @Getter
private final boolean isRetentionApplied;
@Getter
+ private final boolean isCompacted;
+ @Getter
+ private final boolean isCompactedAndDeduped;
+ @Getter
private final String description;
@Getter
private final Config rawConfig;
@@ -56,17 +64,22 @@ public class FSDatasetDescriptor implements
DatasetDescriptor {
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
.put(DatasetDescriptorConfigKeys.PATH_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
.put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false)
+ .put(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false)
+ .put(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false)
.build());
- public FSDatasetDescriptor(Config config) {
+ public FSDatasetDescriptor(Config config) throws IOException {
Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY),
"Dataset descriptor config must specify platform");
this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
this.path = PathUtils.getPathWithoutSchemeAndAuthority(new
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
this.formatConfig = new FormatConfig(config);
+ this.partitionConfig = new
FsDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config,
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
this.isRetentionApplied = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
+ this.isCompacted = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
+ this.isCompactedAndDeduped = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
this.description = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
- this.rawConfig =
config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+ this.rawConfig =
config.withFallback(this.formatConfig.getRawConfig()).withFallback(this.partitionConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
}
/**
@@ -108,11 +121,13 @@ public class FSDatasetDescriptor implements
DatasetDescriptor {
return false;
}
- if (this.isRetentionApplied() != other.isRetentionApplied()) {
+ if ((this.isRetentionApplied() != other.isRetentionApplied()) ||
(this.isCompacted() != other.isCompacted()) ||
+ (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
return false;
}
- return getFormatConfig().contains(other.getFormatConfig()) &&
isPathContaining(other.getPath());
+ return getFormatConfig().contains(other.getFormatConfig()) &&
getPartitionConfig().contains(other.getPartitionConfig())
+ && isPathContaining(other.getPath());
}
/**
@@ -133,15 +148,19 @@ public class FSDatasetDescriptor implements
DatasetDescriptor {
if (this.getPlatform() == null || other.getPlatform() == null ||
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
return false;
}
- if (this.isRetentionApplied() != other.isRetentionApplied()) {
+ if ((this.isRetentionApplied() != other.isRetentionApplied()) ||
(this.isCompacted() != other.isCompacted()) ||
+ (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
return false;
}
- return this.getPath().equals(other.getPath()) &&
this.getFormatConfig().equals(other.getFormatConfig());
+ return this.getPath().equals(other.getPath()) &&
this.getPartitionConfig().equals(other.getPartitionConfig()) &&
+ this.getFormatConfig().equals(other.getFormatConfig());
}
@Override
public String toString() {
- return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(),
this.getFormatConfig().toString()) + ")";
+ return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(),
this.getFormatConfig().toString(), this.getPartitionConfig().toString(),
+ String.valueOf(isRetentionApplied()), String.valueOf(isCompacted()),
String.valueOf(isCompactedAndDeduped()))
+ + ")";
}
@Override
@@ -149,7 +168,11 @@ public class FSDatasetDescriptor implements
DatasetDescriptor {
int result = 17;
result = 31 * result + platform.toLowerCase().hashCode();
result = 31 * result + path.hashCode();
+ result = 31 * result + Boolean.hashCode(isRetentionApplied);
+ result = 31 * result + Boolean.hashCode(isCompacted);
+ result = 31 * result + Boolean.hashCode(isCompactedAndDeduped);
result = 31 * result + getFormatConfig().hashCode();
+ result = 31 * result + getPartitionConfig().hashCode();
return result;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
index 4b45a52..3485ec5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.service.modules.dataset;
+import java.io.IOException;
+
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
@@ -55,7 +57,7 @@ public class FormatConfig {
.put(DatasetDescriptorConfigKeys.CODEC_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
.build());
- public FormatConfig(Config config) {
+ public FormatConfig(Config config) throws IOException {
this.format = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.FORMAT_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
this.codecType = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.CODEC_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config,
DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
new file mode 100644
index 0000000..86eec99
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import com.google.common.base.Enums;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A class that is used to describe partition configuration of a
filesystem-based dataset. Common partitioning
+ * types include "datetime" and "regex". For each partition type, the
corresponding partition pattern (e.g. date pattern or
+ * the regex pattern) is validated.
+ */
+@Slf4j
+public class FsDatasetPartitionConfig {
+ @Getter
+ private final String partitionType;
+ @Getter
+ private final String partitionPattern;
+ @Getter
+ private final Config rawConfig;
+
+ public enum PartitionType {
+ DATETIME("datetime"),
+ REGEX("regex"),
+ NONE(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE),
+ ANY(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
+ private final String type;
+
+ PartitionType(final String type) {
+ this.type = type;
+ }
+
+ @Override
+ public String toString() {
+ return this.type;
+ }
+ }
+
+ private static final Config DEFAULT_FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+ .put(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+ .build());
+
+ public FsDatasetPartitionConfig(Config config) throws IOException {
+ String partitionType = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ String partitionPattern = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ if (partitionType.equalsIgnoreCase(PartitionType.NONE.name())) {
+ partitionPattern =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
+ } else if(partitionType.equalsIgnoreCase(PartitionType.ANY.name())) {
+ partitionPattern =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY;
+ }
+ validatePartitionConfig(partitionType, partitionPattern);
+ this.partitionType = partitionType;
+ this.partitionPattern = partitionPattern;
+ this.rawConfig = config.withFallback(DEFAULT_FALLBACK);
+ }
+
+ private void validatePartitionConfig(String partitionType, String
partitionPattern)
+ throws IOException {
+ if (!Enums.getIfPresent(PartitionType.class,
partitionType.toUpperCase()).isPresent()) {
+ log.error("Invalid partition type {}", partitionType);
+ throw new IOException("Invalid partition type");
+ }
+ switch (PartitionType.valueOf(partitionType.toUpperCase())) {
+ case DATETIME:
+ try {
+ new SimpleDateFormat(partitionPattern);
+ } catch (Exception e) {
+ log.error("Invalid datetime partition pattern {}", partitionPattern);
+ throw new IOException(e);
+ }
+ break;
+ case REGEX:
+ try {
+ Pattern.compile(partitionPattern);
+ } catch (PatternSyntaxException e) {
+ log.error("Invalid regex partition pattern {}", partitionPattern);
+ throw new IOException(e);
+ }
+ break;
+ case NONE:
+ if
(!partitionPattern.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE))
{
+ log.error("Partition pattern {} incompatible with partition type
{}", partitionPattern, partitionType);
+ throw new IOException("Incompatible partition pattern/type");
+ }
+ break;
+ case ANY:
+ if
(!partitionPattern.equalsIgnoreCase(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))
{
+ log.error("Partition pattern {} incompatible with partition type
{}", partitionPattern, partitionType);
+ throw new IOException("Incompatible partition pattern/type");
+ }
+ break;
+ }
+ }
+
+ public boolean contains(FsDatasetPartitionConfig other) {
+ if (other == null) {
+ return false;
+ }
+
+ return
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionType())
+ || this.getPartitionType().equalsIgnoreCase(partitionType)))
+ &&
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
+ ||
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern())));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof FsDatasetPartitionConfig)) {
+ return false;
+ }
+ FsDatasetPartitionConfig other = (FsDatasetPartitionConfig) o;
+ return this.getPartitionType().equalsIgnoreCase(other.getPartitionType())
&&
+
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern());
+ }
+
+ @Override
+ public String toString() {
+ return "(" + Joiner.on(",").join(this.getPartitionType(),
this.getPartitionPattern()) + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + this.getPartitionType().hashCode();
+ result = 31 * result + this.getPartitionPattern().toLowerCase().hashCode();
+ return result;
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index 6470a1d..a144598 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -35,12 +35,21 @@ public class DatasetDescriptorConfigKeys {
public static final String CODEC_KEY = "codec";
public static final String DESCRIPTION_KEY = "description";
public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied";
+ public static final String IS_COMPACTED_KEY = "isCompacted";
+ public static final String IS_COMPACTED_AND_DEDUPED_KEY =
"isCompatedAndDeduped";
//Dataset encryption related keys
public static final String ENCYPTION_PREFIX = "encrypt";
public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
public static final String ENCRYPTION_KEYSTORE_TYPE_KEY = "keystore_type";
public static final String ENCRYPTION_KEYSTORE_ENCODING_KEY =
"keystore_encoding";
+ public static final String ENCRYPTION_LEVEL_KEY = "level";
+ public static final String ENCRYPTED_FIELDS = "encryptedFields";
+
+ //Dataset partition related keys
+ public static final String PARTITION_PREFIX = "partition";
+ public static final String PARTITION_TYPE_KEY = "type";
+ public static final String PARTITION_PATTERN_KEY = "pattern";
public static final String DATASET_DESCRIPTOR_CONFIG_ANY = "any";
public static final String DATASET_DESCRIPTOR_CONFIG_NONE = "none";
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
new file mode 100644
index 0000000..d065cba
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptorTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+
+public class FSDatasetDescriptorTest {
+ @Test
+ public void testContains() throws IOException {
+ //Ensure descriptor2's path is matched by the regular expression in
descriptor1's path
+ Config config1 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"));
+ FSDatasetDescriptor descriptor1 = new FSDatasetDescriptor(config1);
+
+ Config config2 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/d"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"))
+ .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY,
ConfigValueFactory.fromAnyRef("avro"))
+ .withValue(DatasetDescriptorConfigKeys.CODEC_KEY,
ConfigValueFactory.fromAnyRef("gzip"));
+
+ FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
+ Assert.assertTrue(descriptor1.contains(descriptor2));
+
+ //Add encryption config
+ Config encConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
ConfigValueFactory.fromAnyRef("file"))
+ .withValue(DatasetDescriptorConfigKeys.ENCRYPTION_ALGORITHM_KEY,
ConfigValueFactory.fromAnyRef("aes_rotating"))
+ .atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+ Config config3 = config2.withFallback(encConfig);
+ FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
+ Assert.assertTrue(descriptor2.contains(descriptor3));
+ Assert.assertTrue(descriptor1.contains(descriptor3));
+
+ //Add partition config
+ Config partitionConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
ConfigValueFactory.fromAnyRef("datetime"))
+ .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
ConfigValueFactory.fromAnyRef("yyyy/MM/dd"))
+ .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+ Config config4 = config3.withFallback(partitionConfig);
+ FSDatasetDescriptor descriptor4 = new FSDatasetDescriptor(config4);
+ Assert.assertTrue(descriptor3.contains(descriptor4));
+ Assert.assertTrue(descriptor2.contains(descriptor4));
+ Assert.assertTrue(descriptor1.contains(descriptor4));
+
+ //Add compaction/retention config
+ Config miscConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY,
ConfigValueFactory.fromAnyRef("true"))
+ .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY,
ConfigValueFactory.fromAnyRef("true"));
+ Config config5 = config4.withFallback(miscConfig);
+ FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
+ Assert.assertFalse(descriptor4.contains(descriptor5));
+ Assert.assertFalse(descriptor3.contains(descriptor5));
+ Assert.assertFalse(descriptor2.contains(descriptor5));
+ Assert.assertFalse(descriptor1.contains(descriptor5));
+ }
+
+ @Test
+ public void testEquals() throws IOException {
+ Config config1 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"));
+ FSDatasetDescriptor descriptor1 = new FSDatasetDescriptor(config1);
+
+ Config config2 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"));
+ FSDatasetDescriptor descriptor2 = new FSDatasetDescriptor(config2);
+
+ Assert.assertTrue(descriptor1.equals(descriptor2));
+ Assert.assertTrue(descriptor2.equals(descriptor1));
+ Assert.assertEquals(descriptor1.hashCode(), descriptor2.hashCode());
+
+ Config config3 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"))
+ .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY,
ConfigValueFactory.fromAnyRef("any"))
+ .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY,
ConfigValueFactory.fromAnyRef("false"));
+ FSDatasetDescriptor descriptor3 = new FSDatasetDescriptor(config3);
+ Assert.assertTrue(descriptor1.equals(descriptor3));
+ Assert.assertEquals(descriptor1.hashCode(), descriptor3.hashCode());
+
+ //Ensure switching booleans between 2 boolean member variables does not
produce the same hashcode.
+ Config config4 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"))
+ .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY,
ConfigValueFactory.fromAnyRef("any"))
+ .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY,
ConfigValueFactory.fromAnyRef("false"))
+ .withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY,
ConfigValueFactory.fromAnyRef("true"));
+ FSDatasetDescriptor descriptor4 = new FSDatasetDescriptor(config4);
+
+ Config config5 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"))
+ .withValue(DatasetDescriptorConfigKeys.FORMAT_KEY,
ConfigValueFactory.fromAnyRef("any"))
+ .withValue(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY,
ConfigValueFactory.fromAnyRef("true"))
+ .withValue(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY,
ConfigValueFactory.fromAnyRef("false"));
+ FSDatasetDescriptor descriptor5 = new FSDatasetDescriptor(config5);
+
+ Assert.assertFalse(descriptor4.equals(descriptor5));
+ Assert.assertNotEquals(descriptor4.hashCode(), descriptor5.hashCode());
+ }
+
+ @Test
+ public void testInitFails() {
+ //Datetime partition type, invalid datetime pattern
+ Config config =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef("/a/b/c/*"))
+ .withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hdfs"));
+ Config partitionConfig = ConfigFactory.empty()
+ .withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
ConfigValueFactory.fromAnyRef("datetime"))
+ .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
ConfigValueFactory.fromAnyRef("BBBB/MM/dd"))
+ .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+ Config config1 = config.withFallback(partitionConfig);
+ Assert.assertThrows(IOException.class, () -> new
FSDatasetDescriptor(config1));
+
+ //Regex partition type, invalid regular expression
+ partitionConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
ConfigValueFactory.fromAnyRef("regex"))
+ .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
ConfigValueFactory.fromAnyRef("["))
+ .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+ Config config2 = config.withFallback(partitionConfig);
+ Assert.assertThrows(IOException.class, () -> new
FSDatasetDescriptor(config2));
+
+ //Partition Config with invalid partition type
+ partitionConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
ConfigValueFactory.fromAnyRef("invalidType"))
+ .withValue(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
ConfigValueFactory.fromAnyRef("aaaa"))
+ .atPath(DatasetDescriptorConfigKeys.PARTITION_PREFIX);
+ Config config3 = config.withFallback(partitionConfig);
+ Assert.assertThrows(IOException.class, () -> new
FSDatasetDescriptor(config3));
+
+ //Encryption config with invalid encryption level
+ Config encryptionConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
ConfigValueFactory.fromAnyRef("aaaa"))
+ .atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+ Config config4 = config.withFallback(encryptionConfig);
+ Assert.assertThrows(IOException.class, () -> new
FSDatasetDescriptor(config4));
+
+ encryptionConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
ConfigValueFactory.fromAnyRef("field"))
+ .atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+ Config config5 = config.withFallback(encryptionConfig);
+ Assert.assertThrows(IOException.class, () -> new
FSDatasetDescriptor(config5));
+
+ encryptionConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
ConfigValueFactory.fromAnyRef("none"))
+ .withValue(DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS,
ConfigValueFactory.fromAnyRef("field1")).atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX);
+ Config config6 = config.withFallback(encryptionConfig);
+ Assert.assertThrows(IOException.class, () -> new
FSDatasetDescriptor(config6));
+
+
+
+ }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/resources/flow/flow1.conf
b/gobblin-service/src/test/resources/flow/flow1.conf
index f818df6..cad2aa8 100644
--- a/gobblin-service/src/test/resources/flow/flow1.conf
+++ b/gobblin-service/src/test/resources/flow/flow1.conf
@@ -21,4 +21,5 @@ gobblin.flow.output.dataset.descriptor.format=json
gobblin.flow.output.dataset.descriptor.codec=gzip
gobblin.flow.output.dataset.descriptor.encrypt.algorithm=aes_rotating
gobblin.flow.output.dataset.descriptor.encrypt.keystore_type=json
-gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
\ No newline at end of file
+gobblin.flow.output.dataset.descriptor.encrypt.keystore_encoding=base64
+gobblin.flow.output.dataset.descriptor.encrypt.level=file
\ No newline at end of file
diff --git
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
index 2ae6fb6..a9be4ff 100644
---
a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
+++
b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsConvertToJsonAndEncrypt/flow.conf
@@ -16,4 +16,5 @@ gobblin.flow.edge.output.dataset.descriptor.0.format=json
gobblin.flow.edge.output.dataset.descriptor.0.codec=gzip
gobblin.flow.edge.output.dataset.descriptor.0.encrypt.algorithm=aes_rotating
gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_type=json
-gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=base64
\ No newline at end of file
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.keystore_encoding=base64
+gobblin.flow.edge.output.dataset.descriptor.0.encrypt.level=file
\ No newline at end of file