This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3c1efb9 [GOBBLIN-948] add hive data node and descriptor
3c1efb9 is described below
commit 3c1efb9c4608d8ff0c93395117d2136a82565851
Author: Arjun <[email protected]>
AuthorDate: Mon Nov 11 13:36:55 2019 -0800
[GOBBLIN-948] add hive data node and descriptor
Closes #2797 from arjun4084346/ei-dm
---
.../service/modules/dataset/DatasetDescriptor.java | 4 +-
.../modules/dataset/HiveDatasetDescriptor.java | 74 +++++++++++++++++++++
.../modules/dataset/SqlDatasetDescriptor.java | 9 ++-
.../flowgraph/datanodes/hive/HiveDataNode.java | 77 ++++++++++++++++++++++
.../modules/dataset/HiveDatasetDescriptorTest.java | 45 +++++++++++++
5 files changed, 204 insertions(+), 5 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 33c563c..9c6abfe 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -63,8 +63,8 @@ public interface DatasetDescriptor {
/**
* @return true if this {@link DatasetDescriptor} contains the other {@link
DatasetDescriptor} i.e. the
- * datasets described by this {@link DatasetDescriptor} is a subset of the
datasets described by the other
- * {@link DatasetDescriptor}. This operation is non-commutative.
+ * datasets described by the other {@link DatasetDescriptor} is a subset of
this {@link DatasetDescriptor}.
+ * This operation is non-commutative.
*/
public boolean contains(DatasetDescriptor other);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
new file mode 100644
index 0000000..3cc2665
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.EqualsAndHashCode;
+
+import org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper;
+import
org.apache.gobblin.data.management.version.finder.DatePartitionHiveVersionFinder;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * As of now, {@link HiveDatasetDescriptor} has same implementation as that of
{@link SqlDatasetDescriptor}.
+ * Fields {@link HiveDatasetDescriptor#isPartitioned}, {@link
HiveDatasetDescriptor#partitionColumn} and
+ * {@link HiveDatasetDescriptor#partitionFormat} are used for methods 'equals'
and 'hashCode'.
+ */
+@EqualsAndHashCode (callSuper = true)
+public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
+ static final String IS_PARTITIONED_KEY = "isPartitioned";
+ static final String PARTITION_COLUMN = "partition.column";
+ static final String PARTITION_FORMAT = "partition.format";
+ static final String CONFLICT_POLICY = "conflict.policy";
+ private final boolean isPartitioned;
+ private final String partitionColumn;
+ private final String partitionFormat;
+
+ public HiveDatasetDescriptor(Config config) throws IOException {
+ super(config);
+ this.isPartitioned = ConfigUtils.getBoolean(config, IS_PARTITIONED_KEY,
true);
+
+ if (isPartitioned) {
+ partitionColumn = ConfigUtils.getString(config, PARTITION_COLUMN,
DatePartitionHiveVersionFinder.DEFAULT_PARTITION_KEY_NAME);
+ partitionFormat = ConfigUtils.getString(config, PARTITION_FORMAT,
DatePartitionHiveVersionFinder.DEFAULT_PARTITION_VALUE_DATE_TIME_PATTERN);
+ this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
+
ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS.name())));
+ this.setRawConfig(this.getRawConfig().withValue(PARTITION_COLUMN,
ConfigValueFactory.fromAnyRef(partitionColumn)));
+ this.setRawConfig(this.getRawConfig().withValue(PARTITION_FORMAT,
ConfigValueFactory.fromAnyRef(partitionFormat)));
+ } else {
+ partitionColumn = "";
+ partitionFormat = "";
+ this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
+
ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE.name())));
+ }
+ }
+
+ @Override
+ protected boolean isPlatformValid() {
+ return "hive".equalsIgnoreCase(getPlatform());
+ }
+
+ @Override
+ protected boolean isPathContaining(DatasetDescriptor other) {
+ return super.isPathContaining(other) && this.isPartitioned ==
((HiveDatasetDescriptor) other).isPartitioned;
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index b77addd..37e3e5a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -29,6 +29,7 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.EqualsAndHashCode;
import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@@ -41,7 +42,7 @@ import org.apache.gobblin.util.PathUtils;
@ToString (exclude = {"rawConfig"})
@EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements
DatasetDescriptor {
- private static final String SEPARATION_CHAR = ";";
+ protected static final String SEPARATION_CHAR = ";";
private final String databaseName;
private final String tableName;
@@ -49,7 +50,8 @@ public class SqlDatasetDescriptor extends
BaseDatasetDescriptor implements Datas
@Getter
private final String path;
@Getter
- private final Config rawConfig;
+ @Setter
+ private Config rawConfig;
public enum Platform {
SQLSERVER("sqlserver"),
@@ -85,9 +87,10 @@ public class SqlDatasetDescriptor extends
BaseDatasetDescriptor implements Datas
return Joiner.on(SEPARATION_CHAR).join(databaseName, tableName);
}
- private boolean isPlatformValid() {
+ protected boolean isPlatformValid() {
return Enums.getIfPresent(Platform.class,
getPlatform().toUpperCase()).isPresent();
}
+
/**
* Check if the dbName and tableName specified in {@param other}'s path are
accepted by the set of dbName.tableName
* combinations defined by the current {@link SqlDatasetDescriptor}. For
example, let:
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
new file mode 100644
index 0000000..047e278
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.hive;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract {@link HiveDataNode} implementation. In addition to the
required properties of a {@link HdfsDataNode}, an {@link HiveDataNode}
+ * must have a metastore URI specified.
+ */
+@Alpha
+@EqualsAndHashCode (callSuper = true)
+public class HiveDataNode extends HdfsDataNode {
+ public static final String METASTORE_URI_KEY =
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "hive.metastore.uri";
+
+ @Getter
+ private String metastoreUri;
+
+ /**
+ * Constructor. A HiveDataNode must have hive.metastore.uri property
specified in addition to a node Id and fs.uri.
+ */
+ public HiveDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps);
+ try {
+ this.metastoreUri = ConfigUtils.getString(nodeProps, METASTORE_URI_KEY,
"");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(this.metastoreUri),
"hive.metastore.uri cannot be null or empty.");
+
+ //Validate the srcFsUri and destFsUri of the DataNode.
+ if (!isMetastoreUriValid(new URI(this.metastoreUri))) {
+ throw new IOException("Invalid hive metastore URI " +
this.metastoreUri);
+ }
+ } catch (Exception e) {
+ throw new DataNodeCreationException(e);
+ }
+ }
+
+ public boolean isMetastoreUriValid(URI metastoreUri) {
+ String scheme = metastoreUri.getScheme();
+ if (!scheme.equals("thrift")) {
+ return false;
+ }
+ //Ensure that the authority is not empty
+ if
(com.google.common.base.Strings.isNullOrEmpty(metastoreUri.getAuthority())) {
+ return false;
+ }
+ return true;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptorTest.java
new file mode 100644
index 0000000..5c953b6
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+
+public class HiveDatasetDescriptorTest {
+
+ @Test
+ public void objectCreation() throws IOException {
+ Config baseConfig =
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef("hive"))
+ .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY,
ConfigValueFactory.fromAnyRef("testDb_Db1"))
+ .withValue(DatasetDescriptorConfigKeys.TABLE_KEY,
ConfigValueFactory.fromAnyRef("testTable_Table1"));;
+
+ SqlDatasetDescriptor descriptor1 = new
HiveDatasetDescriptor(baseConfig.withValue(HiveDatasetDescriptor.IS_PARTITIONED_KEY,
ConfigValueFactory.fromAnyRef(true)));
+ SqlDatasetDescriptor descriptor2 = new
HiveDatasetDescriptor(baseConfig.withValue(HiveDatasetDescriptor.IS_PARTITIONED_KEY,
ConfigValueFactory.fromAnyRef(false)));
+
+ Assert.assertNotEquals(descriptor1, descriptor2);
+ }
+}
\ No newline at end of file