[ 
https://issues.apache.org/jira/browse/GOBBLIN-1802?focusedWorklogId=854227&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-854227
 ]

ASF GitHub Bot logged work on GOBBLIN-1802:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Mar/23 17:38
            Start Date: 31/Mar/23 17:38
    Worklog Time Spent: 10m 
      Work Description: meethngala commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1154736042


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and 
destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, 
Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(dbName, tblName);
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, 
tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Destination Catalog Table 
Service URI is required");

Review Comment:
   I have added the input values that the user would provide in my latest 
commit. It would be null, since we are checking for null values. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 854227)
    Time Spent: 3h 40m  (was: 3.5h)

> Register iceberg table metadata update with destination side catalog
> --------------------------------------------------------------------
>
>                 Key: GOBBLIN-1802
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1802
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to