This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5027659 [GOBBLIN-708] Create SqlDatasetDescriptor for JDBC-sourced
datasets.
5027659 is described below
commit 5027659ace1f576a715dffaac24b2c30a08f76f4
Author: suvasude <[email protected]>
AuthorDate: Tue Apr 30 17:31:37 2019 -0700
[GOBBLIN-708] Create SqlDatasetDescriptor for JDBC-sourced datasets.
Closes #2577 from sv2000/sqlDatasetDescriptor
---
.../modules/dataset/BaseDatasetDescriptor.java | 93 ++++++++++++++++
.../service/modules/dataset/EncryptionConfig.java | 44 ++------
.../modules/dataset/FSDatasetDescriptor.java | 99 +++-------------
...onConfig.java => FSDatasetPartitionConfig.java} | 43 ++-----
.../service/modules/dataset/FormatConfig.java | 37 +-----
.../modules/dataset/SqlDatasetDescriptor.java | 124 +++++++++++++++++++++
.../service/modules/flowgraph/BaseDataNode.java | 32 +-----
.../flowgraph/DatasetDescriptorConfigKeys.java | 4 +-
.../service/modules/flowgraph/SqlDataNode.java | 54 +++++++++
.../flowgraph/datanodes/fs/FileSystemDataNode.java | 30 +----
.../modules/dataset/SqlDatasetDescriptorTest.java | 61 ++++++++++
11 files changed, 385 insertions(+), 236 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
new file mode 100644
index 0000000..c9ad8b5
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/BaseDatasetDescriptor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+@EqualsAndHashCode (exclude = {"description", "rawConfig"})
+@ToString (exclude = {"description", "rawConfig"})
+public abstract class BaseDatasetDescriptor implements DatasetDescriptor {
+ @Getter
+ private final String platform;
+ @Getter
+ private final FormatConfig formatConfig;
+ @Getter
+ private final boolean isRetentionApplied;
+ @Getter
+ private final String description;
+ @Getter
+ private final Config rawConfig;
+
+ private static final Config DEFAULT_FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(DatasetDescriptorConfigKeys.PATH_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
+ .put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false)
+ .build());
+
+ public BaseDatasetDescriptor(Config config) throws IOException {
+
Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY),
"Dataset descriptor config must specify platform");
+ this.platform =
config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY).toLowerCase();
+ this.formatConfig = new FormatConfig(config);
+ this.isRetentionApplied = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
+ this.description = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
+ this.rawConfig =
config.withFallback(this.formatConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ protected abstract boolean isPathContaining(String otherPath);
+
+ /**
+ * @return true if this {@link DatasetDescriptor} contains the other {@link
DatasetDescriptor} i.e. the
+ * datasets described by this {@link DatasetDescriptor} is a subset of the
datasets described by the other
+ * {@link DatasetDescriptor}. This operation is non-commutative.
+ * @param other
+ */
+ @Override
+ public boolean contains(DatasetDescriptor other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || !getClass().equals(other.getClass())) {
+ return false;
+ }
+
+ if (this.getPlatform() == null ||
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
+ return false;
+ }
+
+ if ((this.isRetentionApplied() != other.isRetentionApplied())) {
+ return false;
+ }
+
+ return isPathContaining(other.getPath()) &&
getFormatConfig().contains(other.getFormatConfig());
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
index 08b96ac..85e5364 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/EncryptionConfig.java
@@ -20,18 +20,21 @@ package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
import com.google.common.base.Enums;
-import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
@Slf4j
+@ToString(exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"})
public class EncryptionConfig {
@Getter
private final String encryptionAlgorithm;
@@ -85,13 +88,13 @@ public class EncryptionConfig {
this.encryptedFields =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
} else {
this.keystoreType = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_TYPE_KEY,
- DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
this.keystoreEncoding = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTION_KEYSTORE_ENCODING_KEY,
- DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
this.encryptionLevel = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTION_LEVEL_KEY,
- DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
this.encryptedFields = ConfigUtils.getString(encryptionConfig,
DatasetDescriptorConfigKeys.ENCRYPTED_FIELDS,
- DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
validate(this.encryptionLevel, this.encryptedFields);
}
this.rawConfig = encryptionConfig.withFallback(DEFAULT_FALLBACK);
@@ -143,35 +146,4 @@ public class EncryptionConfig {
&&
(DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(this.getEncryptedFields())
|| this.encryptedFields.equalsIgnoreCase(otherEncryptedFields));
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof EncryptionConfig)) {
- return false;
- }
- EncryptionConfig other = (EncryptionConfig) o;
- return
this.getEncryptionAlgorithm().equalsIgnoreCase(other.getEncryptionAlgorithm())
&& this.keystoreEncoding.equalsIgnoreCase(other.getKeystoreEncoding())
- && this.getKeystoreType().equalsIgnoreCase(other.getKeystoreType()) &&
this.getEncryptionLevel().equals(other.getEncryptionLevel())
- &&
this.getEncryptedFields().equalsIgnoreCase(other.getEncryptedFields());
- }
-
- @Override
- public String toString() {
- return "(" + Joiner.on(",").join(this.encryptionAlgorithm,
this.encryptionLevel, this.encryptedFields, this.keystoreType,
this.keystoreEncoding) + ")";
- }
-
- @Override
- public int hashCode() {
- int result = 17;
- result = 31 * result + encryptionAlgorithm.toLowerCase().hashCode();
- result = 31 * result + keystoreType.toLowerCase().hashCode();
- result = 31 * result + keystoreEncoding.toLowerCase().hashCode();
- result = 31 * result + encryptionLevel.toLowerCase().hashCode();
- result = 31 * result + encryptedFields.toLowerCase().hashCode();
- return result;
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
index 8b5d7a7..3b2611e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetDescriptor.java
@@ -22,13 +22,13 @@ import java.io.IOException;
import org.apache.hadoop.fs.GlobPattern;
import org.apache.hadoop.fs.Path;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.ToString;
import org.apache.gobblin.annotation.Alpha;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -40,46 +40,35 @@ import org.apache.gobblin.util.PathUtils;
* An implementation of {@link DatasetDescriptor} with FS-based storage.
*/
@Alpha
-public class FSDatasetDescriptor implements DatasetDescriptor {
- @Getter
- private final String platform;
+@ToString (callSuper = true, exclude = {"rawConfig"})
+@EqualsAndHashCode (callSuper = true, exclude = {"rawConfig"})
+public class FSDatasetDescriptor extends BaseDatasetDescriptor implements
DatasetDescriptor {
@Getter
private final String path;
@Getter
- private final FormatConfig formatConfig;
- @Getter
- private final FsDatasetPartitionConfig partitionConfig;
- @Getter
- private final boolean isRetentionApplied;
- @Getter
private final boolean isCompacted;
@Getter
private final boolean isCompactedAndDeduped;
@Getter
- private final String description;
+ private final FSDatasetPartitionConfig partitionConfig;
@Getter
private final Config rawConfig;
private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
- .put(DatasetDescriptorConfigKeys.PATH_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
- .put(DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false)
.put(DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false)
.put(DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false)
.build());
public FSDatasetDescriptor(Config config) throws IOException {
-
Preconditions.checkArgument(config.hasPath(DatasetDescriptorConfigKeys.PLATFORM_KEY),
"Dataset descriptor config must specify platform");
- this.platform = config.getString(DatasetDescriptorConfigKeys.PLATFORM_KEY);
- this.path = PathUtils.getPathWithoutSchemeAndAuthority(new
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
-
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
- this.formatConfig = new FormatConfig(config);
- this.partitionConfig = new
FsDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config,
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
- this.isRetentionApplied = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_RETENTION_APPLIED_KEY, false);
+ super(config);
+ this.path = PathUtils
+ .getPathWithoutSchemeAndAuthority(new
Path(ConfigUtils.getString(config, DatasetDescriptorConfigKeys.PATH_KEY,
+
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY))).toString();
this.isCompacted = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_COMPACTED_KEY, false);
this.isCompactedAndDeduped = ConfigUtils.getBoolean(config,
DatasetDescriptorConfigKeys.IS_COMPACTED_AND_DEDUPED_KEY, false);
- this.description = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.DESCRIPTION_KEY, "");
- this.rawConfig =
config.withFallback(this.formatConfig.getRawConfig()).withFallback(this.partitionConfig.getRawConfig()).withFallback(DEFAULT_FALLBACK);
+ this.partitionConfig = new
FSDatasetPartitionConfig(ConfigUtils.getConfigOrEmpty(config,
DatasetDescriptorConfigKeys.PARTITION_PREFIX));
+ this.rawConfig =
config.withFallback(getPartitionConfig().getRawConfig()).withFallback(DEFAULT_FALLBACK).withFallback(super.getRawConfig());
}
/**
@@ -90,7 +79,7 @@ public class FSDatasetDescriptor implements DatasetDescriptor
{
* @param otherPath a glob pattern that describes a set of paths.
* @return true if the glob pattern described by the otherPath matches the
path in this {@link DatasetDescriptor}.
*/
- private boolean isPathContaining(String otherPath) {
+ protected boolean isPathContaining(String otherPath) {
if (otherPath == null) {
return false;
}
@@ -109,71 +98,17 @@ public class FSDatasetDescriptor implements
DatasetDescriptor {
*/
@Override
public boolean contains(DatasetDescriptor o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof FSDatasetDescriptor)) {
+ if (!super.contains(o)) {
return false;
}
- FSDatasetDescriptor other = (FSDatasetDescriptor) o;
- if (this.getPlatform() == null || other.getPlatform() == null ||
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
- return false;
- }
-
- if ((this.isRetentionApplied() != other.isRetentionApplied()) ||
(this.isCompacted() != other.isCompacted()) ||
- (this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
- return false;
- }
-
- return getFormatConfig().contains(other.getFormatConfig()) &&
getPartitionConfig().contains(other.getPartitionConfig())
- && isPathContaining(other.getPath());
- }
-
- /**
- *
- * @param o the other {@link FSDatasetDescriptor} to compare "this" {@link
FSDatasetDescriptor} with.
- * @return true iff "this" dataset descriptor is compatible with the
"other" and the "other" dataset descriptor is
- * compatible with this dataset descriptor.
- */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof FSDatasetDescriptor)) {
- return false;
- }
FSDatasetDescriptor other = (FSDatasetDescriptor) o;
- if (this.getPlatform() == null || other.getPlatform() == null ||
!this.getPlatform().equalsIgnoreCase(other.getPlatform())) {
- return false;
- }
- if ((this.isRetentionApplied() != other.isRetentionApplied()) ||
(this.isCompacted() != other.isCompacted()) ||
+
+ if ((this.isCompacted() != other.isCompacted()) ||
(this.isCompactedAndDeduped() != other.isCompactedAndDeduped())) {
return false;
}
- return this.getPath().equals(other.getPath()) &&
this.getPartitionConfig().equals(other.getPartitionConfig()) &&
- this.getFormatConfig().equals(other.getFormatConfig());
- }
-
- @Override
- public String toString() {
- return "(" + Joiner.on(",").join(this.getPlatform(), this.getPath(),
this.getFormatConfig().toString(), this.getPartitionConfig().toString(),
- String.valueOf(isRetentionApplied()), String.valueOf(isCompacted()),
String.valueOf(isCompactedAndDeduped()))
- + ")";
- }
- @Override
- public int hashCode() {
- int result = 17;
- result = 31 * result + platform.toLowerCase().hashCode();
- result = 31 * result + path.hashCode();
- result = 31 * result + Boolean.hashCode(isRetentionApplied);
- result = 31 * result + Boolean.hashCode(isCompacted);
- result = 31 * result + Boolean.hashCode(isCompactedAndDeduped);
- result = 31 * result + getFormatConfig().hashCode();
- result = 31 * result + getPartitionConfig().hashCode();
- return result;
+ return this.getPartitionConfig().contains(other.getPartitionConfig());
}
-
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
similarity index 83%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
index 86eec99..eb022ae 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FsDatasetPartitionConfig.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FSDatasetPartitionConfig.java
@@ -23,12 +23,13 @@ import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import com.google.common.base.Enums;
-import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -41,7 +42,9 @@ import org.apache.gobblin.util.ConfigUtils;
* the regex pattern) is validated.
*/
@Slf4j
-public class FsDatasetPartitionConfig {
+@ToString (exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"})
+public class FSDatasetPartitionConfig {
@Getter
private final String partitionType;
@Getter
@@ -73,9 +76,9 @@ public class FsDatasetPartitionConfig {
.put(DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY)
.build());
- public FsDatasetPartitionConfig(Config config) throws IOException {
- String partitionType = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
- String partitionPattern = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ public FSDatasetPartitionConfig(Config config) throws IOException {
+ String partitionType = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.PARTITION_TYPE_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
+ String partitionPattern = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.PARTITION_PATTERN_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
if (partitionType.equalsIgnoreCase(PartitionType.NONE.name())) {
partitionPattern =
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_NONE;
} else if(partitionType.equalsIgnoreCase(PartitionType.ANY.name())) {
@@ -125,41 +128,13 @@ public class FsDatasetPartitionConfig {
}
}
- public boolean contains(FsDatasetPartitionConfig other) {
+ public boolean contains(FSDatasetPartitionConfig other) {
if (other == null) {
return false;
}
-
return
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionType())
|| this.getPartitionType().equalsIgnoreCase(partitionType)))
&&
((DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY.equalsIgnoreCase(getPartitionPattern())
||
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern())));
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof FsDatasetPartitionConfig)) {
- return false;
- }
- FsDatasetPartitionConfig other = (FsDatasetPartitionConfig) o;
- return this.getPartitionType().equalsIgnoreCase(other.getPartitionType())
&&
-
this.getPartitionPattern().equalsIgnoreCase(other.getPartitionPattern());
- }
-
- @Override
- public String toString() {
- return "(" + Joiner.on(",").join(this.getPartitionType(),
this.getPartitionPattern()) + ")";
- }
-
- @Override
- public int hashCode() {
- int result = 17;
- result = 31 * result + this.getPartitionType().hashCode();
- result = 31 * result + this.getPartitionPattern().toLowerCase().hashCode();
- return result;
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
index 3485ec5..e02940b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/FormatConfig.java
@@ -19,12 +19,13 @@ package org.apache.gobblin.service.modules.dataset;
import java.io.IOException;
-import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.ToString;
import org.apache.gobblin.annotation.Alpha;
import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
@@ -41,6 +42,8 @@ import org.apache.gobblin.util.ConfigUtils;
* </ul>
*/
@Alpha
+@ToString (exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"})
public class FormatConfig {
@Getter
private final String format;
@@ -58,8 +61,8 @@ public class FormatConfig {
.build());
public FormatConfig(Config config) throws IOException {
- this.format = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.FORMAT_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
- this.codecType = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.CODEC_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY);
+ this.format = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.FORMAT_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
+ this.codecType = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.CODEC_KEY,
DatasetDescriptorConfigKeys.DATASET_DESCRIPTOR_CONFIG_ANY).toLowerCase();
this.encryptionConfig = new EncryptionConfig(ConfigUtils.getConfig(config,
DatasetDescriptorConfigKeys.ENCYPTION_PREFIX, ConfigFactory
.empty()));
this.rawConfig =
config.withFallback(this.encryptionConfig.getRawConfig().atPath(DatasetDescriptorConfigKeys.ENCYPTION_PREFIX)).
@@ -84,32 +87,4 @@ public class FormatConfig {
private boolean containsEncryptionConfig(EncryptionConfig
otherEncryptionConfig) {
return this.getEncryptionConfig().contains(otherEncryptionConfig);
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof FormatConfig)) {
- return false;
- }
- FormatConfig other = (FormatConfig) o;
- return this.getFormat().equalsIgnoreCase(other.getFormat()) &&
this.getCodecType().equalsIgnoreCase(other.getCodecType())
- && this.getEncryptionConfig().equals(other.getEncryptionConfig());
- }
-
- @Override
- public String toString() {
- return "(" + Joiner.on(",").join(this.getFormat(), this.getCodecType(),
this.getEncryptionConfig().toString()) + ")";
- }
-
- @Override
- public int hashCode() {
- int result = 17;
- result = 31 * result + codecType.toLowerCase().hashCode();
- result = 31 * result + format.toLowerCase().hashCode();
- result = 31 * result + encryptionConfig.hashCode();
- return result;
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
new file mode 100644
index 0000000..2811992
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Enums;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PathUtils;
+
+
+@Slf4j
+@ToString (exclude = {"rawConfig"})
+@EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
+public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements
DatasetDescriptor {
+ private static final String SEPARATION_CHAR = ";";
+
+ private final String databaseName;
+ private final String tableName;
+
+ @Getter
+ private final String path;
+ @Getter
+ private final Config rawConfig;
+
+ public enum Platform {
+ SQLSERVER("sqlserver"),
+ MYSQL("mysql"),
+ ORACLE("oracle"),
+ POSTGRES("postgres"),
+ TERADARA("teradata");
+
+ private final String platform;
+
+ Platform(final String platform) {
+ this.platform = platform;
+ }
+
+ @Override
+ public String toString() {
+ return this.platform;
+ }
+ }
+
+ public SqlDatasetDescriptor(Config config) throws IOException {
+ super(config);
+ if (!isPlatformValid()) {
+ throw new IOException("Invalid platform specified for
SqlDatasetDescriptor: " + getPlatform());
+ }
+ this.databaseName = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.DATABASE_KEY, ".*");
+ this.tableName = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.TABLE_KEY, ".*");
+ this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
+ this.rawConfig = config.withValue(DatasetDescriptorConfigKeys.PATH_KEY,
ConfigValueFactory.fromAnyRef(this.path)).withFallback(super.getRawConfig());
+ }
+
+ private String fullyQualifiedTableName(String databaseName, String
tableName) {
+ return Joiner.on(SEPARATION_CHAR).join(databaseName, tableName);
+ }
+
+ private boolean isPlatformValid() {
+ return Enums.getIfPresent(Platform.class,
getPlatform().toUpperCase()).isPresent();
+ }
+ /**
+ * Check if the dbName and tableName specified in {@param otherPath} are
accepted by the set of dbName.tableName
+ * combinations defined by the current {@link SqlDatasetDescriptor}. For
example, let:
+ * this.path = "test_.*;test_table_.*". Then:
+ * isPathContaining("test_db1;test_table_1") = true
+ * isPathContaining("testdb1;test_table_2") = false
+ *
+ * NOTE: otherPath cannot be a globPattern. So:
+ * isPathContaining("test_db.*;test_table_*") = false
+ *
+ * @param otherPath which should be in the format of dbName.tableName
+ */
+ @Override
+ protected boolean isPathContaining(String otherPath) {
+ if (otherPath == null) {
+ return false;
+ }
+
+ if (PathUtils.GLOB_TOKENS.matcher(otherPath).find()) {
+ return false;
+ }
+
+ //Extract the dbName and tableName from otherPath
+ List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
+ if (parts.size() != 2) {
+ return false;
+ }
+
+ String otherDbName = parts.get(0);
+ String otherTableName = parts.get(1);
+
+ return Pattern.compile(this.databaseName).matcher(otherDbName).matches()
&& Pattern.compile(this.tableName).matcher(otherTableName).matches();
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
index 4fb9711..23bf83b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseDataNode.java
@@ -19,21 +19,22 @@ package org.apache.gobblin.service.modules.flowgraph;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.util.ConfigUtils;
import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.util.ConfigUtils;
+
/**
* An implementation of {@link DataNode}.
*/
@Alpha
@Slf4j
+@EqualsAndHashCode (exclude = {"rawConfig", "active"})
public class BaseDataNode implements DataNode {
@Getter
private String id;
@@ -55,27 +56,4 @@ public class BaseDataNode implements DataNode {
throw new DataNodeCreationException(e);
}
}
-
- /**
- * The comparison between two nodes should involve the configuration.
- * Node name is the identifier for the node.
- * */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- BaseDataNode that = (BaseDataNode) o;
-
- return id.equals(that.getId());
- }
-
- @Override
- public int hashCode() {
- return this.id.hashCode();
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
index a144598..905ef81 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/DatasetDescriptorConfigKeys.java
@@ -31,12 +31,14 @@ public class DatasetDescriptorConfigKeys {
public static final String CLASS_KEY = "class";
public static final String PLATFORM_KEY = "platform";
public static final String PATH_KEY = "path";
+ public static final String DATABASE_KEY = "databaseName";
+ public static final String TABLE_KEY = "tableName";
public static final String FORMAT_KEY = "format";
public static final String CODEC_KEY = "codec";
public static final String DESCRIPTION_KEY = "description";
public static final String IS_RETENTION_APPLIED_KEY = "isRetentionApplied";
public static final String IS_COMPACTED_KEY = "isCompacted";
- public static final String IS_COMPACTED_AND_DEDUPED_KEY =
"isCompatedAndDeduped";
+ public static final String IS_COMPACTED_AND_DEDUPED_KEY =
"isCompactedAndDeduped";
//Dataset encryption related keys
public static final String ENCYPTION_PREFIX = "encrypt";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SqlDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SqlDataNode.java
new file mode 100644
index 0000000..4d90172
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/SqlDataNode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.util.ConfigUtils;
+
+@EqualsAndHashCode (callSuper = true)
+public class SqlDataNode extends BaseDataNode {
+ public static final String SQL_HOSTNAME =
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sql.hostname";
+ public static final String SQL_PORT =
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sql.port";
+ public static final String SQL_DRIVER =
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + ".sql.driver";
+
+ @Getter
+ private String hostName;
+ @Getter
+ private Integer port;
+ @Getter
+ private String jdbcDriver;
+
+ public SqlDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ try {
+ this.hostName = ConfigUtils.getString(nodeProps, SQL_HOSTNAME, "");
+ this.port = ConfigUtils.getInt(nodeProps, SQL_PORT, 0);
+ this.jdbcDriver = ConfigUtils.getString(nodeProps, SQL_DRIVER, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(hostName),
SQL_HOSTNAME + " cannot be null or empty.");
+ Preconditions.checkArgument(port != 0, SQL_PORT + " cannot be empty.");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(jdbcDriver),
SQL_DRIVER + " cannot be null or empty.");
+ } catch (Exception e) {
+ throw new DataNodeCreationException(e);
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
index 72f1a66..9830f47 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/FileSystemDataNode.java
@@ -20,24 +20,25 @@ package
org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
import java.io.IOException;
import java.net.URI;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
+import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
-import joptsimple.internal.Strings;
-import lombok.Getter;
-
/**
* An abstract {@link FileSystemDataNode} implementation. In addition to the
required properties of a {@link BaseDataNode}, an {@link FileSystemDataNode}
* must have a FS URI specified. Example implementations of {@link
FileSystemDataNode} include {@link HdfsDataNode}, {@link LocalFSDataNode}.
*/
@Alpha
+@EqualsAndHashCode (callSuper = true)
public abstract class FileSystemDataNode extends BaseDataNode {
public static final String FS_URI_KEY =
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri";
@@ -63,25 +64,4 @@ public abstract class FileSystemDataNode extends
BaseDataNode {
}
public abstract boolean isUriValid(URI fsUri);
- /**
- * Two HDFS DataNodes are the same if they have the same id and the same
fsUri.
- */
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FileSystemDataNode that = (FileSystemDataNode) o;
-
- return this.getId().equals(that.getId()) &&
this.fsUri.equals(that.getFsUri());
- }
-
- @Override
- public int hashCode() {
- return Joiner.on("-").join(this.getId(), this.fsUri).hashCode();
- }
}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
new file mode 100644
index 0000000..75a142a
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptorTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+import static org.testng.Assert.*;
+
+
+public class SqlDatasetDescriptorTest {
+
+ @Test
+ public void testContains() throws IOException {
+ Config config1 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("sqlserver"))
+ .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY,
ConfigValueFactory.fromAnyRef("testDb_Db1"))
+ .withValue(DatasetDescriptorConfigKeys.TABLE_KEY,
ConfigValueFactory.fromAnyRef("testTable_Table1"));
+
+ SqlDatasetDescriptor descriptor1 = new SqlDatasetDescriptor(config1);
+
+ Config config2 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("sqlserver"));
+ SqlDatasetDescriptor descriptor2 = new SqlDatasetDescriptor(config2);
+ Assert.assertTrue(descriptor2.contains(descriptor1));
+
+ Config config3 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("sqlserver"))
+ .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY,
ConfigValueFactory.fromAnyRef("testDb_.*"))
+ .withValue(DatasetDescriptorConfigKeys.TABLE_KEY,
ConfigValueFactory.fromAnyRef("testTable_.*"));
+
+ SqlDatasetDescriptor descriptor3 = new SqlDatasetDescriptor(config3);
+ Assert.assertTrue(descriptor3.contains(descriptor1));
+
+ Config config4 =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("sqlserver"))
+ .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY,
ConfigValueFactory.fromAnyRef("Db_.*"))
+ .withValue(DatasetDescriptorConfigKeys.TABLE_KEY,
ConfigValueFactory.fromAnyRef("Table_.*"));
+ SqlDatasetDescriptor descriptor4 = new SqlDatasetDescriptor(config4);
+ Assert.assertFalse(descriptor4.contains(descriptor1));
+ }
+}
\ No newline at end of file