Will-Lo commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1154026198


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and 
destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, 
Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(dbName, tblName);
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, 
tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Destination Catalog Table 
Service URI is required");

Review Comment:
   In error messages like these, can you include the explicit key in the 
precondition message? So that when this error occurs it is clear what the 
user/developer needs to add.



##########
gradle/scripts/dependencyDefinitions.gradle:
##########
@@ -123,7 +123,7 @@ ext.externalDependency = [
     "guiceMultibindings": 
"com.google.inject.extensions:guice-multibindings:4.0",
     "guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
     "derby": "org.apache.derby:derby:10.12.1.1",
-    "mockito": "org.mockito:mockito-core:4.11.0",
+    "mockito": "org.mockito:mockito-inline:4.11.0",

Review Comment:
   Can you provide justification in a comment why we're using inline mockito 
instead of mockito core? From what I read mockito inline is for experimental 
features.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and 
destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, 
Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(dbName, tblName);
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, 
tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Destination Catalog Table 
Service URI is required");

Review Comment:
   And same thing for source table key missing as well



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication

Review Comment:
   Should we actually enforce this semantic? It differs from Hive distcp, which 
creates the table if the table is missing on the destination



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to