phet commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1163489856


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, 
configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table 
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
     catalogProperties.put(CatalogProperties.URI, catalogUri);
-    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+    return catalogProperties;
+  }
+
+  private static String getConfigPrefix(CatalogLocation location) {
+    return ICEBERG_DATASET_PREFIX + "." + location.toString().toLowerCase() + 
".";

Review Comment:
   should be a method on the `CatalogLocation` enum



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem 
fs) {
    * @param dstMetadata is null if destination {@link IcebergTable} is absent, 
in which case registration is skipped */
   protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata 
dstMetadata) {
     if (dstMetadata != null) {
-      this.tableOps.commit(srcMetadata, dstMetadata);
+      // commit (baseMetadata -> destination metadata, updatedMetadata -> 
source metadata)

Review Comment:
   overly cryptic.  maybe say "use current destination metadata as 'base 
version' and source as update"?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, 
configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table 
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);

Review Comment:
   shouldn't there be a `"."` between `configPrefix` and the URI KEY?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -49,14 +52,9 @@
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
-  public static final String ICEBERG_CLUSTER_KEY = "cluster";
-  public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
-  public static final String ICEBERG_SRC_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
-  public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX 
+ ".source.cluster.name";
-  public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
-  public static final String ICEBERG_DEST_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
-  public static final String ICEBERG_DEST_CLUSTER_NAME = 
ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
+  public static final String ICEBERG_CATALOG_CLASS = "catalog.class";
+  public static final String ICEBERG_CATALOG_URI_KEY = "catalog.uri";

Review Comment:
   please document how these are used... i.e.:
   ```ICEBERG_CATALOG_PREFIX + "." + ("source" or "destination") + "..."```
   be sure to explain how this is an open-ended pattern that allows for passing 
arbitrary catalog-specific props through to the catalog.
   
   also, given that configs are essentially a global namespace, I'm not 
convinced that the prefix in use is discriminating enough.  
   
   whereas at present arbitrary properties go after:
   ```
   iceberg.dataset.source
   ```
   which means
   ```
   iceberg.dataset.destination.database
   ```
   would get passed through--and that seems wrong.
   
   instead, it might be more appropriate to put them after
   ```
   iceberg.dataset.source.catalog
   ```
   (or `iceberg.dataset.catalog.source`)
   
   seems most appropriate, given these are *catalog-specific* props



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -316,8 +316,8 @@ protected DatasetDescriptor 
getDestinationDataset(FileSystem targetFs) {
     return this.destIcebergTable.getDatasetDescriptor(targetFs);
   }
 
-  private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, 
IcebergTable dstIcebergTable) {
-    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+  private PostPublishStep createPostPublishStep(String dbName, String 
inputTableName, Properties properties) {

Review Comment:
   seems inappropriate for the general case to serialize every single global 
property here.  can't we constrain only to those actually related to creating 
the iceberg catalogs, source and dest?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {

Review Comment:
   the semantics here seem non-obvious, hence deserving of javadoc.  the name 
seems a bit inexact as well.  maybe `buildMapFromPrefixChildren`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to