[ 
https://issues.apache.org/jira/browse/GOBBLIN-1786?focusedWorklogId=846739&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-846739
 ]

ASF GitHub Bot logged work on GOBBLIN-1786:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Feb/23 21:56
            Start Date: 21/Feb/23 21:56
    Worklog Time Spent: 10m 
      Work Description: meethngala commented on code in PR #3643:
URL: https://github.com/apache/gobblin/pull/3643#discussion_r1113608848


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -61,23 +70,35 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
   @Override
   public List<IcebergDataset> findDatasets() throws IOException {
     List<IcebergDataset> matchingDatasets = new ArrayList<>();
+    Map<String, String> catalogProperties = Maps.newHashMap();
     if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) || 
StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
       throw new IllegalArgumentException(String.format("Iceberg database name: 
{%s} or Iceberg table name: {%s} is missing",
           ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
     }
     String dbName = properties.getProperty(ICEBERG_DB_NAME);
     String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+    String specifierName = 
properties.getProperty(ICEBERG_SRC_CATALOG_SPECIFIER_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_SPECIFIER_CLASS);
+    // introducing an optional property for catalogs requiring cluster 
specific properties
+    Optional<String> srcClusterName = 
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME));
+    srcClusterName.ifPresent(s -> 
catalogProperties.put(DatasetConstants.PLATFORM_CLUSTER, s));
 
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
 
-    IcebergCatalog icebergCatalog = 
IcebergCatalogFactory.create(configuration);
-    /* Each Iceberg dataset maps to an Iceberg table
-     * TODO: The user provided database and table names needs to be 
pre-checked and verified against the existence of a valid Iceberg table
-     */
-    matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, 
properties, sourceFs));
-    log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);
-
-    return matchingDatasets;
+    try {
+      Class<?> catalogSpecifier = Class.forName(specifierName);
+      IcebergCatalog.CatalogSpecifier specifier =
+          (IcebergCatalog.CatalogSpecifier) 
GobblinConstructorUtils.invokeConstructor(catalogSpecifier, specifierName);
+      IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(specifier, 
catalogProperties, configuration);

Review Comment:
   gotcha... makes sense! I have added the `createIcebergCatalog` method in my 
latest commit!





Issue Time Tracking
-------------------

    Worklog Id:     (was: 846739)
    Time Spent: 2h 10m  (was: 2h)

> Support Other Catalog Types for Iceberg Distcp
> ----------------------------------------------
>
>                 Key: GOBBLIN-1786
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1786
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to