This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new deec15e46 [GOBBLIN-1739]Define Datanodes and Dataset Descriptor for
Iceberg (#3596)
deec15e46 is described below
commit deec15e46e93a73646f685ac1185626503130829
Author: meethngala <[email protected]>
AuthorDate: Wed Nov 16 10:41:00 2022 -0800
[GOBBLIN-1739]Define Datanodes and Dataset Descriptor for Iceberg (#3596)
* adding iceberg datanode and dataset descriptor
* adding apache license info
* addressing comments on PR
* fixed checkstyle
* addressed PR comments
Co-authored-by: Meeth Gala <[email protected]>
---
.../modules/dataset/IcebergDatasetDescriptor.java | 91 ++++++++++++++++++++++
.../flowgraph/datanodes/hive/HiveDataNode.java | 43 +---------
...DataNode.java => HiveMetastoreUriDataNode.java} | 28 ++-----
.../datanodes/iceberg/IcebergOnHiveDataNode.java | 55 +++++++++++++
.../dataset/IcebergDatasetDescriptorTest.java | 53 +++++++++++++
.../datanodes/iceberg/IcebergDataNodeTest.java | 71 +++++++++++++++++
6 files changed, 279 insertions(+), 62 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
new file mode 100644
index 000000000..4ac46a0f9
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+/**
+ * {@link IcebergDatasetDescriptor} is a dataset descriptor for an
Iceberg-based table, independent of the type of Iceberg catalog
+ * Fields {@link IcebergDatasetDescriptor#databaseName} and {@link
IcebergDatasetDescriptor#tableName} are used to
+ * identify an iceberg.
+ */
+@EqualsAndHashCode(callSuper = true)
+public class IcebergDatasetDescriptor extends BaseDatasetDescriptor {
+ protected static final String SEPARATION_CHAR = ";";
+ protected final String databaseName;
+ protected final String tableName;
+ @Getter
+ private final String path;
+
+ /**
+ * Constructor for {@link IcebergDatasetDescriptor}
+ * @param config
+ * @throws IOException
+ */
+ public IcebergDatasetDescriptor(Config config) throws IOException {
+ super(config);
+ if (!isPlatformValid()) {
+ throw new IOException("Invalid platform specified for
IcebergDatasetDescriptor: " + getPlatform());
+ }
+ // setting defaults to empty; later used to throw as IO Exception
+ this.databaseName = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.DATABASE_KEY, "");
+ this.tableName = ConfigUtils.getString(config,
DatasetDescriptorConfigKeys.TABLE_KEY, "");
+ if (this.databaseName.isEmpty() || this.tableName.isEmpty()) {
+ throw new IOException("Invalid iceberg database or table name: " +
this.databaseName + ":" + this.tableName);
+ }
+ this.path = fullyQualifiedTableName(this.databaseName, this.tableName);
+ }
+
+ protected boolean isPlatformValid() {
+ return "iceberg".equalsIgnoreCase(getPlatform());
+ }
+
+ private String fullyQualifiedTableName(String databaseName, String
tableName) {
+ return Joiner.on(SEPARATION_CHAR).join(databaseName, tableName);
+ }
+
+ @Override
+ protected boolean isPathContaining(DatasetDescriptor other) {
+ String otherPath = other.getPath();
+ if (otherPath == null) {
+ return false;
+ }
+
+ //Extract the dbName and tableName from otherPath
+ List<String> parts = Splitter.on(SEPARATION_CHAR).splitToList(otherPath);
+ if (parts.size() != 2) {
+ return false;
+ }
+
+ String otherDbName = parts.get(0);
+ String otherTableName = parts.get(1);
+
+ return this.databaseName.equals(otherDbName) &&
this.tableName.equals(otherTableName);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
index f324ef682..4902dc146 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
@@ -17,68 +17,29 @@
package org.apache.gobblin.service.modules.flowgraph.datanodes.hive;
-import java.io.IOException;
-import java.net.URI;
-
-import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
-import joptsimple.internal.Strings;
import lombok.EqualsAndHashCode;
-import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.dataset.HiveDatasetDescriptor;
import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import org.apache.gobblin.util.ConfigUtils;
/**
* An {@link HiveDataNode} implementation. In addition to the required
properties of a {@link BaseDataNode}, an {@link HiveDataNode}
- * must have a metastore URI specified.
+ * specifies platform as hive.
*/
@Alpha
@EqualsAndHashCode (callSuper = true)
-public class HiveDataNode extends BaseDataNode {
- public static final String METASTORE_URI_KEY =
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "hive.metastore.uri";
+public class HiveDataNode extends HiveMetastoreUriDataNode {
public static final String PLATFORM = "hive";
- @Getter
- private String metastoreUri;
-
/**
* Constructor. A HiveDataNode must have hive.metastore.uri property
specified in addition to a node Id and fs.uri.
*/
public HiveDataNode(Config nodeProps) throws DataNodeCreationException {
super(nodeProps);
- try {
- this.metastoreUri = ConfigUtils.getString(nodeProps, METASTORE_URI_KEY,
"");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(this.metastoreUri),
"hive.metastore.uri cannot be null or empty.");
-
- //Validate the srcFsUri and destFsUri of the DataNode.
- if (!isMetastoreUriValid(new URI(this.metastoreUri))) {
- throw new IOException("Invalid hive metastore URI " +
this.metastoreUri);
- }
- } catch (Exception e) {
- throw new DataNodeCreationException(e);
- }
- }
-
- /**
- * @param metastoreUri hive metastore URI
- * @return true if the scheme is "thrift" and authority is not empty.
- */
- public boolean isMetastoreUriValid(URI metastoreUri) {
- String scheme = metastoreUri.getScheme();
- if (!scheme.equals("thrift")) {
- return false;
- }
- //Ensure that the authority is not empty
- if
(com.google.common.base.Strings.isNullOrEmpty(metastoreUri.getAuthority())) {
- return false;
- }
- return true;
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveMetastoreUriDataNode.java
similarity index 77%
copy from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
copy to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveMetastoreUriDataNode.java
index f324ef682..dc077a558 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveDataNode.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/hive/HiveMetastoreUriDataNode.java
@@ -28,29 +28,25 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.service.modules.dataset.HiveDatasetDescriptor;
+
import org.apache.gobblin.service.modules.flowgraph.BaseDataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
-
/**
- * An {@link HiveDataNode} implementation. In addition to the required
properties of a {@link BaseDataNode}, an {@link HiveDataNode}
+ * An {@link HiveMetastoreUriDataNode} implementation. In addition to the
required properties of a {@link BaseDataNode}, an {@link
HiveMetastoreUriDataNode}
* must have a metastore URI specified.
*/
@Alpha
-@EqualsAndHashCode (callSuper = true)
-public class HiveDataNode extends BaseDataNode {
+@EqualsAndHashCode(callSuper = true)
+public class HiveMetastoreUriDataNode extends BaseDataNode {
public static final String METASTORE_URI_KEY =
FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "hive.metastore.uri";
- public static final String PLATFORM = "hive";
-
@Getter
private String metastoreUri;
-
/**
- * Constructor. A HiveDataNode must have hive.metastore.uri property
specified in addition to a node Id and fs.uri.
+ * Constructor. A HiveMetastoreUriDataNode must have hive.metastore.uri
property specified in addition to a node Id and fs.uri.
*/
- public HiveDataNode(Config nodeProps) throws DataNodeCreationException {
+ public HiveMetastoreUriDataNode(Config nodeProps) throws
DataNodeCreationException {
super(nodeProps);
try {
this.metastoreUri = ConfigUtils.getString(nodeProps, METASTORE_URI_KEY,
"");
@@ -80,14 +76,4 @@ public class HiveDataNode extends BaseDataNode {
}
return true;
}
-
- @Override
- public String getDefaultDatasetDescriptorClass() {
- return HiveDatasetDescriptor.class.getCanonicalName();
- }
-
- @Override
- public String getDefaultDatasetDescriptorPlatform() {
- return PLATFORM;
- }
-}
\ No newline at end of file
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergOnHiveDataNode.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergOnHiveDataNode.java
new file mode 100644
index 000000000..52a62dc9a
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergOnHiveDataNode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.iceberg;
+
+import com.typesafe.config.Config;
+
+import lombok.EqualsAndHashCode;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog;
+import org.apache.gobblin.service.modules.dataset.IcebergDatasetDescriptor;
+import
org.apache.gobblin.service.modules.flowgraph.datanodes.hive.HiveMetastoreUriDataNode;
+
+/**
+ * In addition to the required properties of a {@link
HiveMetastoreUriDataNode}, an {@link IcebergOnHiveDataNode}
+ * must have a metastore URI specified. Specifies iceberg platform and
uniquely identifies a hive catalog.
+ * See {@link IcebergHiveCatalog} for more information
+ */
+@Alpha
+@EqualsAndHashCode(callSuper = true)
+public class IcebergOnHiveDataNode extends HiveMetastoreUriDataNode {
+ public static final String PLATFORM = "iceberg";
+ /**
+ * Constructor. An IcebergOnHiveDataNode must have hive.metastore.uri
property specified to get {@link IcebergHiveCatalog} information
+ * @param nodeProps
+ */
+ public IcebergOnHiveDataNode(Config nodeProps) throws
DataNodeCreationException {
+ super(nodeProps);
+ }
+ @Override
+ public String getDefaultDatasetDescriptorClass() {
+ return IcebergDatasetDescriptor.class.getCanonicalName();
+ }
+
+ @Override
+ public String getDefaultDatasetDescriptorPlatform() {
+ return PLATFORM;
+ }
+
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
new file mode 100644
index 000000000..98986d224
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/dataset/IcebergDatasetDescriptorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.dataset;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import
org.apache.gobblin.service.modules.flowgraph.DatasetDescriptorConfigKeys;
+
+
+public class IcebergDatasetDescriptorTest {
+ Config baseConfig = createDatasetDescriptorConfig("iceberg","testDb_Db1",
"testTable_Table1");
+
+ @Test
+ public void testIsPathContaining() throws IOException {
+ Config config1 = createDatasetDescriptorConfig("iceberg","testDb_Db1",
"testTable_Table1");
+ Config config2 = createDatasetDescriptorConfig("iceberg","testDb_Db1",
"testTable_Table2");
+
+ IcebergDatasetDescriptor current = new
IcebergDatasetDescriptor(baseConfig);
+ IcebergDatasetDescriptor other = new IcebergDatasetDescriptor(config1);
+ IcebergDatasetDescriptor yetAnother = new
IcebergDatasetDescriptor(config2);
+
+ Assert.assertTrue(current.isPathContaining(other));
+ Assert.assertFalse(current.isPathContaining(yetAnother));
+
+ }
+ private Config createDatasetDescriptorConfig(String platform, String dbName,
String tableName) {
+ return
ConfigFactory.empty().withValue(DatasetDescriptorConfigKeys.PLATFORM_KEY,
ConfigValueFactory.fromAnyRef(platform))
+ .withValue(DatasetDescriptorConfigKeys.DATABASE_KEY,
ConfigValueFactory.fromAnyRef(dbName))
+ .withValue(DatasetDescriptorConfigKeys.TABLE_KEY,
ConfigValueFactory.fromAnyRef(tableName));
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergDataNodeTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergDataNodeTest.java
new file mode 100644
index 000000000..a3955a7fd
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/iceberg/IcebergDataNodeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.iceberg;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class IcebergDataNodeTest {
+
+ Config config = null;
+
+ @BeforeMethod
+ public void setUp() {
+ String sampleNodeId = "some-iceberg-node-id";
+ String sampleAdlFsUri = "hdfs://data.hdfs.core.windows.net";
+ String sampleHiveMetastoreUri = "thrift://hcat.company.com:7552";
+
+ config = ConfigFactory.empty()
+ .withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
ConfigValueFactory.fromAnyRef(sampleNodeId))
+ .withValue(FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "fs.uri",
ConfigValueFactory.fromAnyRef(sampleAdlFsUri))
+ .withValue(FlowGraphConfigurationKeys.DATA_NODE_PREFIX +
"hive.metastore.uri", ConfigValueFactory.fromAnyRef(sampleHiveMetastoreUri));
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ }
+
+ @Test
+ public void testIcebergDataNodeWithValidMetastoreUri() throws
DataNode.DataNodeCreationException, URISyntaxException {
+ IcebergOnHiveDataNode icebergDataNode = new IcebergOnHiveDataNode(config);
+ URI uri = new
URI(config.getString(FlowGraphConfigurationKeys.DATA_NODE_PREFIX +
"hive.metastore.uri"));
+ Assert.assertTrue(icebergDataNode.isMetastoreUriValid(uri));
+ }
+
+ @Test(expectedExceptions = DataNode.DataNodeCreationException.class)
+ public void testIcebergDataNodeWithInvalidMetastoreUri() throws
DataNode.DataNodeCreationException, URISyntaxException {
+ String bogusHiveMetastoreUri = "not-thrift://hcat.company.com:7552";
+ config = config.withValue(FlowGraphConfigurationKeys.DATA_NODE_PREFIX +
"hive.metastore.uri", ConfigValueFactory.fromAnyRef(bogusHiveMetastoreUri));
+ IcebergOnHiveDataNode icebergDataNode = new IcebergOnHiveDataNode(config);
+ URI uri = new
URI(config.getString(FlowGraphConfigurationKeys.DATA_NODE_PREFIX +
"hive.metastore.uri"));
+ icebergDataNode.isMetastoreUriValid(uri);
+ }
+}