This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c1efb9  [GOBBLIN-948] add hive data node and descriptor
3c1efb9 is described below

commit 3c1efb9c4608d8ff0c93395117d2136a82565851
Author: Arjun <[email protected]>
AuthorDate: Mon Nov 11 13:36:55 2019 -0800

    [GOBBLIN-948] add hive data node and descriptor
    
    Closes #2797 from arjun4084346/ei-dm
---
 .../service/modules/dataset/DatasetDescriptor.java |  4 +-
 .../modules/dataset/HiveDatasetDescriptor.java     | 74 +++++++++++++++++++++
 .../modules/dataset/SqlDatasetDescriptor.java      |  9 ++-
 .../flowgraph/datanodes/hive/HiveDataNode.java     | 77 ++++++++++++++++++++++
 .../modules/dataset/HiveDatasetDescriptorTest.java | 45 +++++++++++++
 5 files changed, 204 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
index 33c563c..9c6abfe 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/DatasetDescriptor.java
@@ -63,8 +63,8 @@ public interface DatasetDescriptor {
 
   /**
    * @return true if this {@link DatasetDescriptor} contains the other {@link 
DatasetDescriptor} i.e. the
-   * datasets described by this {@link DatasetDescriptor} is a subset of the 
datasets described by the other
-   * {@link DatasetDescriptor}. This operation is non-commutative.
+   * datasets described by the other {@link DatasetDescriptor} is a subset of 
this {@link DatasetDescriptor}.
+   * This operation is non-commutative.
    */
   public boolean contains(DatasetDescriptor other);
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
new file mode 100644
index 0000000..3cc2665
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.EqualsAndHashCode;
+
+import org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper;
+import 
org.apache.gobblin.data.management.version.finder.DatePartitionHiveVersionFinder;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * As of now, {@link HiveDatasetDescriptor} has same implementation as that of 
{@link SqlDatasetDescriptor}.
+ * Fields {@link HiveDatasetDescriptor#isPartitioned}, {@link 
HiveDatasetDescriptor#partitionColumn} and
+ * {@link HiveDatasetDescriptor#partitionFormat} are used for methods 'equals' 
and 'hashCode'.
+ */
+@EqualsAndHashCode (callSuper = true)
+public class HiveDatasetDescriptor extends SqlDatasetDescriptor {
+  static final String IS_PARTITIONED_KEY = "isPartitioned";
+  static final String PARTITION_COLUMN = "partition.column";
+  static final String PARTITION_FORMAT = "partition.format";
+  static final String CONFLICT_POLICY = "conflict.policy";
+  private final boolean isPartitioned;
+  private final String partitionColumn;
+  private final String partitionFormat;
+
+  public HiveDatasetDescriptor(Config config) throws IOException {
+    super(config);
+    this.isPartitioned = ConfigUtils.getBoolean(config, IS_PARTITIONED_KEY, 
true);
+
+    if (isPartitioned) {
+      partitionColumn = ConfigUtils.getString(config, PARTITION_COLUMN, 
DatePartitionHiveVersionFinder.DEFAULT_PARTITION_KEY_NAME);
+      partitionFormat = ConfigUtils.getString(config, PARTITION_FORMAT, 
DatePartitionHiveVersionFinder.DEFAULT_PARTITION_VALUE_DATE_TIME_PATTERN);
+      this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
+          
ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS.name())));
+      this.setRawConfig(this.getRawConfig().withValue(PARTITION_COLUMN, 
ConfigValueFactory.fromAnyRef(partitionColumn)));
+      this.setRawConfig(this.getRawConfig().withValue(PARTITION_FORMAT, 
ConfigValueFactory.fromAnyRef(partitionFormat)));
+    } else {
+      partitionColumn = "";
+      partitionFormat = "";
+      this.setRawConfig(this.getRawConfig().withValue(CONFLICT_POLICY,
+          
ConfigValueFactory.fromAnyRef(HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE.name())));
+    }
+  }
+
+  @Override
+  protected boolean isPlatformValid() {
+    return "hive".equalsIgnoreCase(getPlatform());
+  }
+
+  @Override
+  protected boolean isPathContaining(DatasetDescriptor other) {
+    return super.isPathContaining(other) && this.isPartitioned == 
((HiveDatasetDescriptor) other).isPartitioned;
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
index b77addd..37e3e5a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/SqlDatasetDescriptor.java
@@ -29,6 +29,7 @@ import com.typesafe.config.ConfigValueFactory;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
@@ -41,7 +42,7 @@ import org.apache.gobblin.util.PathUtils;
 @ToString (exclude = {"rawConfig"})
 @EqualsAndHashCode (exclude = {"rawConfig"}, callSuper = true)
 public class SqlDatasetDescriptor extends BaseDatasetDescriptor implements 
DatasetDescriptor {
-  private static final String SEPARATION_CHAR = ";";
+  protected static final String SEPARATION_CHAR = ";";
 
   private final String databaseName;
   private final String tableName;
@@ -49,7 +50,8 @@ public class SqlDatasetDescriptor extends 
BaseDatasetDescriptor implements Datas
   @Getter
   private final String path;
   @Getter
-  private final Config rawConfig;
+  @Setter
+  private Config rawConfig;
 
   public enum Platform {
     SQLSERVER("sqlserver"),
@@ -85,9 +87,10 @@ public class SqlDatasetDescriptor extends 
BaseDatasetDescriptor implements Datas
     return Joiner.on(SEPARATION_CHAR).join(databaseName, tableName);
   }
 
-  private boolean isPlatformValid() {
+  protected boolean isPlatformValid() {
     return Enums.getIfPresent(Platform.class, 
getPlatform().toUpperCase()).isPresent();
   }
+
   /**
    * Check if the dbName and tableName specified in {@param other}'s path are 
accepted by the set of dbName.tableName
    * combinations defined by the current {@link SqlDatasetDescriptor}. For 
example, let:
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
new file mode 100644
index 0000000..047e278
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.hive;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import joptsimple.internal.Strings;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.datanodes.fs.HdfsDataNode;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * An abstract {@link HiveDataNode} implementation. In addition to the 
required properties of a {@link HdfsDataNode}, an {@link HiveDataNode}
+ * must have a metastore URI specified.
+ */
+@Alpha
+@EqualsAndHashCode (callSuper = true)
+public class HiveDataNode extends HdfsDataNode {
+  public static final String METASTORE_URI_KEY = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "hive.metastore.uri";
+
+  @Getter
+  private String metastoreUri;
+
+  /**
+   * Constructor. A HiveDataNode must have hive.metastore.uri property 
specified in addition to a node Id and fs.uri.
+   */
+  public HiveDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+    try {
+      this.metastoreUri = ConfigUtils.getString(nodeProps, METASTORE_URI_KEY, 
"");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(this.metastoreUri), 
"hive.metastore.uri cannot be null or empty.");
+
+      //Validate the srcFsUri and destFsUri of the DataNode.
+      if (!isMetastoreUriValid(new URI(this.metastoreUri))) {
+        throw new IOException("Invalid hive metastore URI " + 
this.metastoreUri);
+      }
+    } catch (Exception e) {
+      throw new DataNodeCreationException(e);
+    }
+  }
+
+  public boolean isMetastoreUriValid(URI metastoreUri) {
+    String scheme = metastoreUri.getScheme();
+    if (!scheme.equals("thrift")) {
+      return false;
+    }
+    //Ensure that the authority is not empty
+    if 
(com.google.common.base.Strings.isNullOrEmpty(metastoreUri.getAuthority())) {
+      return false;
+    }
+    return true;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptorTest.java
new file mode 100644
index 0000000..5c953b6
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/HiveDatasetDescriptorTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+
+public class HiveDatasetDescriptorTest {
+
+  @Test
+  public void objectCreation() throws IOException {
+    Config baseConfig = 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef("hive"))
+        .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, 
ConfigValueFactory.fromAnyRef("testDb_Db1"))
+        .withValue(DatasetDescriptorConfigKeys.TABLE_KEY, 
ConfigValueFactory.fromAnyRef("testTable_Table1"));;
+
+    SqlDatasetDescriptor descriptor1 = new 
HiveDatasetDescriptor(baseConfig.withValue(HiveDatasetDescriptor.IS_PARTITIONED_KEY,
 ConfigValueFactory.fromAnyRef(true)));
+    SqlDatasetDescriptor descriptor2 = new 
HiveDatasetDescriptor(baseConfig.withValue(HiveDatasetDescriptor.IS_PARTITIONED_KEY,
 ConfigValueFactory.fromAnyRef(false)));
+
+    Assert.assertNotEquals(descriptor1, descriptor2);
+  }
+}
\ No newline at end of file

Reply via email to