This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new deec15e46 [GOBBLIN-1739]Define Datanodes and Dataset Descriptor for 
Iceberg (#3596)
deec15e46 is described below

commit deec15e46e93a73646f685ac1185626503130829
Author: meethngala <[email protected]>
AuthorDate: Wed Nov 16 10:41:00 2022 -0800

    [GOBBLIN-1739]Define Datanodes and Dataset Descriptor for Iceberg (#3596)
    
    * adding iceberg datanode and dataset descriptor
    
    * adding apache license info
    
    * addressing comments on PR
    
    * fixed checkstyle
    
    * addressed PR comments
    
    Co-authored-by: Meeth Gala <[email protected]>
---
 .../modules/dataset/IcebergDatasetDescriptor.java  | 91 ++++++++++++++++++++++
 .../flowgraph/datanodes/hive/HiveDataNode.java     | 43 +---------
 ...DataNode.java => HiveMetastoreUriDataNode.java} | 28 ++-----
 .../datanodes/iceberg/IcebergOnHiveDataNode.java   | 55 +++++++++++++
 .../dataset/IcebergDatasetDescriptorTest.java      | 53 +++++++++++++
 .../datanodes/iceberg/IcebergDataNodeTest.java     | 71 +++++++++++++++++
 6 files changed, 279 insertions(+), 62 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
new file mode 100644
index 000000000..4ac46a0f9
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * {@link IcebergDatasetDescriptor} is a dataset descriptor for an 
Iceberg-based table, independent of the type of Iceberg catalog
+ * Fields {@link IcebergDatasetDescriptor#databaseName} and {@link 
IcebergDatasetDescriptor#tableName} are used to
+ * identify an iceberg.
+ */
+@EqualsAndHashCode(callSuper = true)
+public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
+  protected static final String SEPARATION_CHAR = ";";
+  protected final String databaseName;
+  protected final String tableName;
+  @Getter
+  private final String path;
+
+  /**
+   * Constructor for {@link IcebergDatasetDescriptor}
+   * @param config
+   * @throws IOException
+   */
+  public IcebergDatasetDescriptor(Config config) throws IOException {
+    super(config);
+    if (!isPlatformValid()) {
+      throw new IOException("Invalid platform specified for 
IcebergDatasetDescriptor: " + getPlatform());
+    }
+    // setting defaults to empty; later used to throw as IO Exception
+    this.databaseName = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.DATABASE_KEY, "");
+    this.tableName = ConfigUtils.getString(config, 
DatasetDescriptorConfigKeys.TABLE_KEY, "");
+    if (this.databaseName.isEmpty() || this.tableName.isEmpty()) {
+      throw new IOException("Invalid iceberg database or table name: " + 
this.databaseName + ":" + this.tableName);
+    }
+    this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
+  }
+
+  protected boolean isPlatformValid() {
+    return "iceberg".equalsIgnoreCase(getPlatform());
+  }
+
+  private String fullyQualifiedTableName(String databaseName, String 
tableName) {
+    return Joiner.on(SEPARATION_CHAR).join(databaseName, tableName);
+  }
+
+  @Override
+  protected boolean isPathContaining(DatasetDescriptor other) {
+    String otherPath = other.getPath();
+    if (otherPath == null) {
+      return false;
+    }
+
+    //Extract the dbName and tableName from otherPath
+    List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
+    if (parts.size() != 2) {
+      return false;
+    }
+
+    String otherDbName = parts.get(0);
+    String otherTableName = parts.get(1);
+
+    return this.databaseName.equals(otherDbName) && 
this.tableName.equals(otherTableName);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
index f324ef682..4902dc146 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
@@ -17,68 +17,29 @@
 
 package org.apache.gobblin.service.modules.flowgraph.datanodes.hive;
 
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 
-import joptsimple.internal.Strings;
 import lombok.EqualsAndHashCode;
-import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.service.modules.dataset.HiveDatasetDescriptor;
 import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * An {@link HiveDataNode} implementation. In addition to the required 
properties of a {@link BaseDataNode}, an {@link HiveDataNode}
- * must have a metastore URI specified.
+ * specifies platform as hive.
  */
 @Alpha
 @EqualsAndHashCode (callSuper = true)
-public class HiveDataNode extends BaseDataNode {
-  public static final String METASTORE_URI_KEY = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "hive.metastore.uri";
+public class HiveDataNode extends HiveMetastoreUriDataNode {
   public static final String PLATFORM = "hive";
 
-  @Getter
-  private String metastoreUri;
-
   /**
    * Constructor. A HiveDataNode must have hive.metastore.uri property 
specified in addition to a node Id and fs.uri.
    */
   public HiveDataNode(Config nodeProps) throws DataNodeCreationException {
     super(nodeProps);
-    try {
-      this.metastoreUri = ConfigUtils.getString(nodeProps, METASTORE_URI_KEY, 
"");
-      Preconditions.checkArgument(!Strings.isNullOrEmpty(this.metastoreUri), 
"hive.metastore.uri cannot be null or empty.");
-
-      //Validate the srcFsUri and destFsUri of the DataNode.
-      if (!isMetastoreUriValid(new URI(this.metastoreUri))) {
-        throw new IOException("Invalid hive metastore URI " + 
this.metastoreUri);
-      }
-    } catch (Exception e) {
-      throw new DataNodeCreationException(e);
-    }
-  }
-
-  /**
-   * @param metastoreUri hive metastore URI
-   * @return true if the scheme is "thrift" and authority is not empty.
-   */
-  public boolean isMetastoreUriValid(URI metastoreUri) {
-    String scheme = metastoreUri.getScheme();
-    if (!scheme.equals("thrift")) {
-      return false;
-    }
-    //Ensure that the authority is not empty
-    if 
(com.google.common.base.Strings.isNullOrEmpty(metastoreUri.getAuthority())) {
-      return false;
-    }
-    return true;
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveMetastoreUriDataNode.java
similarity index 77%
copy from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
copy to 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveMetastoreUriDataNode.java
index f324ef682..dc077a558 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveMetastoreUriDataNode.java
@@ -28,29 +28,25 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.HiveDatasetDescriptor;
+
 import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
-
 /**
- * An {@link HiveDataNode} implementation. In addition to the required 
properties of a {@link BaseDataNode}, an {@link HiveDataNode}
+ * An {@link HiveMetastoreUriDataNode} implementation. In addition to the 
required properties of a {@link BaseDataNode}, an {@link 
HiveMetastoreUriDataNode}
  * must have a metastore URI specified.
  */
 @Alpha
-@EqualsAndHashCode (callSuper = true)
-public class HiveDataNode extends BaseDataNode {
+@EqualsAndHashCode(callSuper = true)
+public class HiveMetastoreUriDataNode extends BaseDataNode {
   public static final String METASTORE_URI_KEY = 
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "hive.metastore.uri";
-  public static final String PLATFORM = "hive";
-
   @Getter
   private String metastoreUri;
-
   /**
-   * Constructor. A HiveDataNode must have hive.metastore.uri property 
specified in addition to a node Id and fs.uri.
+   * Constructor. A HiveMetastoreUriDataNode must have hive.metastore.uri 
property specified in addition to a node Id and fs.uri.
    */
-  public HiveDataNode(Config nodeProps) throws DataNodeCreationException {
+  public HiveMetastoreUriDataNode(Config nodeProps) throws 
DataNodeCreationException {
     super(nodeProps);
     try {
       this.metastoreUri = ConfigUtils.getString(nodeProps, METASTORE_URI_KEY, 
"");
@@ -80,14 +76,4 @@ public class HiveDataNode extends BaseDataNode {
     }
     return true;
   }
-
-  @Override
-  public String getDefaultDatasetDescriptorClass() {
-    return HiveDatasetDescriptor.class.getCanonicalName();
-  }
-
-  @Override
-  public String getDefaultDatasetDescriptorPlatform() {
-    return PLATFORM;
-  }
-}
\ No newline at end of file
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergOnHiveDataNode.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergOnHiveDataNode.java
new file mode 100644
index 000000000..52a62dc9a
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergOnHiveDataNode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.iceberg;
+
+import com.typesafe.config.Config;
+
+import lombok.EqualsAndHashCode;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog;
+import org.apache.gobblin.service.modules.dataset.IcebergDatasetDescriptor;
+import 
org.apache.gobblin.service.modules.flowgraph.datanodes.hive.HiveMetastoreUriDataNode;
+
+/**
+ * In addition to the required properties of a {@link 
HiveMetastoreUriDataNode}, an {@link IcebergOnHiveDataNode}
+ * must have a metastore URI specified. Specifies iceberg platform and 
uniquely identifies a hive catalog.
+ * See {@link IcebergHiveCatalog} for more information
+ */
+@Alpha
+@EqualsAndHashCode(callSuper = true)
+public class IcebergOnHiveDataNode extends HiveMetastoreUriDataNode {
+  public static final String PLATFORM = "iceberg";
+  /**
+   * Constructor. An IcebergOnHiveDataNode must have hive.metastore.uri 
property specified to get {@link IcebergHiveCatalog} information
+   * @param nodeProps
+   */
+  public IcebergOnHiveDataNode(Config nodeProps) throws 
DataNodeCreationException {
+    super(nodeProps);
+  }
+  @Override
+  public String getDefaultDatasetDescriptorClass() {
+    return IcebergDatasetDescriptor.class.getCanonicalName();
+  }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return PLATFORM;
+  }
+
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
new file mode 100644
index 000000000..98986d224
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import 
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+
+public class IcebergDatasetDescriptorTest {
+  Config baseConfig = createDatasetDescriptorConfig("iceberg","testDb_Db1", 
"testTable_Table1");
+
+  @Test
+  public void testIsPathContaining() throws IOException {
+    Config config1 = createDatasetDescriptorConfig("iceberg","testDb_Db1", 
"testTable_Table1");
+    Config config2 = createDatasetDescriptorConfig("iceberg","testDb_Db1", 
"testTable_Table2");
+
+    IcebergDatasetDescriptor current = new 
IcebergDatasetDescriptor(baseConfig);
+    IcebergDatasetDescriptor other = new IcebergDatasetDescriptor(config1);
+    IcebergDatasetDescriptor yetAnother = new 
IcebergDatasetDescriptor(config2);
+
+    Assert.assertTrue(current.isPathContaining(other));
+    Assert.assertFalse(current.isPathContaining(yetAnother));
+
+  }
+  private Config createDatasetDescriptorConfig(String platform, String dbName, 
String tableName) {
+    return 
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY, 
ConfigValueFactory.fromAnyRef(platform))
+        .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY, 
ConfigValueFactory.fromAnyRef(dbName))
+        .withValue(DatasetDescriptorConfigKeys.TABLE_KEY, 
ConfigValueFactory.fromAnyRef(tableName));
+  }
+}
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergDataNodeTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergDataNodeTest.java
new file mode 100644
index 000000000..a3955a7fd
--- /dev/null
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergDataNodeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.iceberg;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class IcebergDataNodeTest {
+
+  Config config = null;
+
+  @BeforeMethod
+  public void setUp() {
+    String sampleNodeId = "some-iceberg-node-id";
+    String sampleAdlFsUri = "hdfs://data.hdfs.core.windows.net";
+    String sampleHiveMetastoreUri = "thrift://hcat.company.com:7552";
+
+    config = ConfigFactory.empty()
+        .withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, 
ConfigValueFactory.fromAnyRef(sampleNodeId))
+        .withValue(FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri", 
ConfigValueFactory.fromAnyRef(sampleAdlFsUri))
+        .withValue(FlowGraphConfigurationKeys.DATA_NODE_PREFIX + 
"hive.metastore.uri", ConfigValueFactory.fromAnyRef(sampleHiveMetastoreUri));
+  }
+
+  @AfterMethod
+  public void tearDown() {
+  }
+
+  @Test
+  public void testIcebergDataNodeWithValidMetastoreUri() throws 
DataNode.DataNodeCreationException, URISyntaxException {
+    IcebergOnHiveDataNode icebergDataNode = new IcebergOnHiveDataNode(config);
+    URI uri = new 
URI(config.getString(FlowGraphConfigurationKeys.DATA_NODE_PREFIX + 
"hive.metastore.uri"));
+    Assert.assertTrue(icebergDataNode.isMetastoreUriValid(uri));
+  }
+
+  @Test(expectedExceptions = DataNode.DataNodeCreationException.class)
+  public void testIcebergDataNodeWithInvalidMetastoreUri() throws 
DataNode.DataNodeCreationException, URISyntaxException {
+    String bogusHiveMetastoreUri = "not-thrift://hcat.company.com:7552";
+    config = config.withValue(FlowGraphConfigurationKeys.DATA_NODE_PREFIX + 
"hive.metastore.uri", ConfigValueFactory.fromAnyRef(bogusHiveMetastoreUri));
+    IcebergOnHiveDataNode icebergDataNode = new IcebergOnHiveDataNode(config);
+    URI uri = new 
URI(config.getString(FlowGraphConfigurationKeys.DATA_NODE_PREFIX + 
"hive.metastore.uri"));
+    icebergDataNode.isMetastoreUriValid(uri);
+  }
+}

Reply via email to