[ 
https://issues.apache.org/jira/browse/GOBBLIN-1802?focusedWorklogId=854052&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-854052
 ]

ASF GitHub Bot logged work on GOBBLIN-1802:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Mar/23 05:34
            Start Date: 31/Mar/23 05:34
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1154026198


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and 
destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, 
Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(dbName, tblName);
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, 
tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Destination Catalog Table 
Service URI is required");

Review Comment:
   In error messages like these, can you include the explicit key in the 
precondition message? So that when this error occurs it is clear what the 
user/developer needs to add.



##########
gradle/scripts/dependencyDefinitions.gradle:
##########
@@ -123,7 +123,7 @@ ext.externalDependency = [
     "guiceMultibindings": 
"com.google.inject.extensions:guice-multibindings:4.0",
     "guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
     "derby": "org.apache.derby:derby:10.12.1.1",
-    "mockito": "org.mockito:mockito-core:4.11.0",
+    "mockito": "org.mockito:mockito-inline:4.11.0",

Review Comment:
   Can you provide justification in a comment why we're using inline mockito 
instead of mockito core? From what I read mockito inline is for experimental 
features.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and 
destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, 
Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(dbName, tblName);
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, 
tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Destination Catalog Table 
Service URI is required");

Review Comment:
   And same thing for source table key missing as well



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication

Review Comment:
   Should we actually enforce this semantic? It differs from Hive distcp, which 
creates the table if the table is missing on the destination





Issue Time Tracking
-------------------

    Worklog Id:     (was: 854052)
    Time Spent: 3.5h  (was: 3h 20m)

> Register iceberg table metadata update with destination side catalog
> --------------------------------------------------------------------
>
>                 Key: GOBBLIN-1802
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1802
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to