meethngala commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1164613567


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {

Review Comment:
   gotcha... renamed the method and added the javadoc for it as well :)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -316,8 +316,8 @@ protected DatasetDescriptor 
getDestinationDataset(FileSystem targetFs) {
     return this.destIcebergTable.getDatasetDescriptor(targetFs);
   }
 
-  private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, 
IcebergTable dstIcebergTable) {
-    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+  private PostPublishStep createPostPublishStep(String dbName, String 
inputTableName, Properties properties) {

Review Comment:
   I understand the concern here as we shouldn't need to serialize all the 
properties. I explored the option of doing it, but I am skeptical because we 
use those `properties` for providing configuration for Hadoop too which is 
utilized by `CatalogUtil.loadCatalog()`. And we pass the properties as-is to 
the configuration and it is set for an instance of Configurable: 
`(Configurable)catalog).setConf(hadoopConf)`. Now, it would be exhaustive to 
search how these hadoop configs are used by different catalog-impl. Does that 
make sense?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem 
fs) {
    * @param dstMetadata is null if destination {@link IcebergTable} is absent, 
in which case registration is skipped */
   protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata 
dstMetadata) {
     if (dstMetadata != null) {
-      this.tableOps.commit(srcMetadata, dstMetadata);
+      // commit (baseMetadata -> destination metadata, updatedMetadata -> 
source metadata)

Review Comment:
   gotcha. I have simplified it in my latest commit!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to