[
https://issues.apache.org/jira/browse/GOBBLIN-1786?focusedWorklogId=846225&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-846225
]
ASF GitHub Bot logged work on GOBBLIN-1786:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 17/Feb/23 17:20
Start Date: 17/Feb/23 17:20
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3643:
URL: https://github.com/apache/gobblin/pull/3643#discussion_r1110079897
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -61,23 +70,35 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
@Override
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ Map<String, String> catalogProperties = Maps.newHashMap();
if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) ||
StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
throw new IllegalArgumentException(String.format("Iceberg database name:
{%s} or Iceberg table name: {%s} is missing",
ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
}
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+ String specifierName =
properties.getProperty(ICEBERG_SRC_CATALOG_SPECIFIER_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_SPECIFIER_CLASS);
+ // introducing an optional property for catalogs requiring cluster
specific properties
+ Optional<String> srcClusterName =
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME));
+ srcClusterName.ifPresent(s ->
catalogProperties.put(DatasetConstants.PLATFORM_CLUSTER, s));
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- IcebergCatalog icebergCatalog =
IcebergCatalogFactory.create(configuration);
- /* Each Iceberg dataset maps to an Iceberg table
- * TODO: The user provided database and table names needs to be
pre-checked and verified against the existence of a valid Iceberg table
- */
- matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog,
properties, sourceFs));
- log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);
-
- return matchingDatasets;
+ try {
+ Class<?> catalogSpecifier = Class.forName(specifierName);
+ IcebergCatalog.CatalogSpecifier specifier =
+ (IcebergCatalog.CatalogSpecifier)
GobblinConstructorUtils.invokeConstructor(catalogSpecifier, specifierName);
+ IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(specifier,
catalogProperties, configuration);
Review Comment:
this and the properties access just above feel worth abstracting into a
helper method, like `createIcebergCatalog`. let's leave it `protected`, so it
could be overrridden by someone wishing to quickly hard-code something specific
to their stack
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java:
##########
@@ -17,22 +17,54 @@
package org.apache.gobblin.data.management.copy.iceberg;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
+import lombok.extern.slf4j.Slf4j;
+
/**
* Hive-Metastore-based {@link IcebergCatalog}.
*/
@Slf4j
-@AllArgsConstructor
public class IcebergHiveCatalog implements IcebergCatalog {
+
+ /**
+ * Ensures pairing between {@link IcebergCatalog} and its implementation
type i.e. {@link HiveCatalog} in this case.
+ * Unfortunately, {@link
org.apache.iceberg.BaseMetastoreCatalog}.newTableOps is protected.
+ * Hence, it necessitates this abstraction to define and access {@link
IcebergTable}
+ */
+ public static class HiveSpecifier implements CatalogSpecifier {
+
+ public static final String HIVE_CATALOG_NAME = "hive";
+ @Override
+ public Class<? extends Catalog> getCatalogClass() {
+ return HiveCatalog.class;
+ }
+
+ @Override
+ public Class<? extends IcebergCatalog> getIcebergCatalogClass() {
+ return IcebergHiveCatalog.class;
+ }
+
+ @Override
+ public String getCatalogName() {
+ return HIVE_CATALOG_NAME;
+ }
+ }
+
// NOTE: specifically necessitates `HiveCatalog`, as
`BaseMetastoreCatalog.newTableOps` is `protected`!
private final HiveCatalog hc;
+ public IcebergHiveCatalog(Catalog catalog) {
+ if (catalog instanceof HiveCatalog) {
+ hc = (HiveCatalog) catalog;
+ } else {
+ throw new IllegalArgumentException(String.format("Incorrect Catalog
Provided: Got %s instead of HiveCatalog " ,catalog.getClass().getName()));
Review Comment:
space on wrong side of `,`?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -87,8 +87,8 @@ public IcebergDataset(String db, String table, IcebergTable
icebergTbl, Properti
this.icebergTable = icebergTbl;
this.properties = properties;
this.sourceFs = sourceFs;
- this.sourceCatalogMetastoreURI = getAsOptionalURI(this.properties,
IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY);
- this.targetCatalogMetastoreURI = getAsOptionalURI(this.properties,
TARGET_METASTORE_URI_KEY);
+ this.sourceCatalogURI = getAsOptionalURI(this.properties,
IcebergDatasetFinder.ICEBERG_SRC_CATALOG_URI_KEY);
Review Comment:
rather than searching for info in `properties`, let's have the
`IcebergCatalog` tell us its identifying URI. it should know which props it
was configured with, and moreover the specific set may vary from catalog to
catalog. definitely let's encapsulate all of that from the code here.
since we don't yet have a dest-side `IcebergCatalog`, ok to leave that one
as-is w/ a TODO just above, explaining the plan
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -61,23 +70,35 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
@Override
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ Map<String, String> catalogProperties = Maps.newHashMap();
if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) ||
StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
throw new IllegalArgumentException(String.format("Iceberg database name:
{%s} or Iceberg table name: {%s} is missing",
ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
}
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+ String specifierName =
properties.getProperty(ICEBERG_SRC_CATALOG_SPECIFIER_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_SPECIFIER_CLASS);
+ // introducing an optional property for catalogs requiring cluster
specific properties
+ Optional<String> srcClusterName =
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME));
+ srcClusterName.ifPresent(s ->
catalogProperties.put(DatasetConstants.PLATFORM_CLUSTER, s));
Review Comment:
nice `Optional`... but not truly necessary to have a temporary
(`srcClusterName`)
##########
gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java:
##########
@@ -26,6 +26,8 @@ public class DatasetConstants {
public static final String PLATFORM_SALESFORCE = "salesforce";
public static final String PLATFORM_MYSQL = "mysql";
public static final String PLATFORM_ICEBERG = "iceberg";
+ public static final String PLATFORM_CLUSTER = "cluster";
Review Comment:
this doesn't seem like the other ones here...
in what way are you defining a new 'platform' vs. just needing a string
constant?
Issue Time Tracking
-------------------
Worklog Id: (was: 846225)
Time Spent: 2h (was: 1h 50m)
> Support Other Catalog Types for Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-1786
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1786
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)