[
https://issues.apache.org/jira/browse/GOBBLIN-1786?focusedWorklogId=846739&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-846739
]
ASF GitHub Bot logged work on GOBBLIN-1786:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Feb/23 21:56
Start Date: 21/Feb/23 21:56
Worklog Time Spent: 10m
Work Description: meethngala commented on code in PR #3643:
URL: https://github.com/apache/gobblin/pull/3643#discussion_r1113608848
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -61,23 +70,35 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
@Override
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ Map<String, String> catalogProperties = Maps.newHashMap();
if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) ||
StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
throw new IllegalArgumentException(String.format("Iceberg database name:
{%s} or Iceberg table name: {%s} is missing",
ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
}
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+ String specifierName =
properties.getProperty(ICEBERG_SRC_CATALOG_SPECIFIER_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_SPECIFIER_CLASS);
+ // introducing an optional property for catalogs requiring cluster
specific properties
+ Optional<String> srcClusterName =
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME));
+ srcClusterName.ifPresent(s ->
catalogProperties.put(DatasetConstants.PLATFORM_CLUSTER, s));
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- IcebergCatalog icebergCatalog =
IcebergCatalogFactory.create(configuration);
- /* Each Iceberg dataset maps to an Iceberg table
- * TODO: The user provided database and table names needs to be
pre-checked and verified against the existence of a valid Iceberg table
- */
- matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog,
properties, sourceFs));
- log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);
-
- return matchingDatasets;
+ try {
+ Class<?> catalogSpecifier = Class.forName(specifierName);
+ IcebergCatalog.CatalogSpecifier specifier =
+ (IcebergCatalog.CatalogSpecifier)
GobblinConstructorUtils.invokeConstructor(catalogSpecifier, specifierName);
+ IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(specifier,
catalogProperties, configuration);
Review Comment:
gotcha... makes sense! I have added the `createIcebergCatalog` method in my
latest commit!
Issue Time Tracking
-------------------
Worklog Id: (was: 846739)
Time Spent: 2h 10m (was: 2h)
> Support Other Catalog Types for Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-1786
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1786
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)