meethngala commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1164613288


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, 
configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table 
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
     catalogProperties.put(CatalogProperties.URI, catalogUri);
-    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+    return catalogProperties;
+  }
+
+  private static String getConfigPrefix(CatalogLocation location) {
+    return ICEBERG_DATASET_PREFIX + "." + location.toString().toLowerCase() + 
".";

Review Comment:
   gotcha! Created a method inside enum as suggested. The changes are reflected 
in my latest commit.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, 
configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table 
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);

Review Comment:
   yeah... I missed it. Updated it in my latest commit!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to