[ 
https://issues.apache.org/jira/browse/GOBBLIN-1811?focusedWorklogId=856304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-856304
 ]

ASF GitHub Bot logged work on GOBBLIN-1811:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/23 02:31
            Start Date: 12/Apr/23 02:31
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1163489856


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, 
configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table 
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
     catalogProperties.put(CatalogProperties.URI, catalogUri);
-    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+    return catalogProperties;
+  }
+
+  private static String getConfigPrefix(CatalogLocation location) {
+    return ICEBERG_DATASET_PREFIX + "." + location.toString().toLowerCase() + 
".";

Review Comment:
   should be a method on the `CatalogLocation` enum



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem 
fs) {
    * @param dstMetadata is null if destination {@link IcebergTable} is absent, 
in which case registration is skipped */
   protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata 
dstMetadata) {
     if (dstMetadata != null) {
-      this.tableOps.commit(srcMetadata, dstMetadata);
+      // commit (baseMetadata -> destination metadata, updatedMetadata -> 
source metadata)

Review Comment:
   overly cryptic.  maybe say "use current destination metadata as 'base 
version' and source as update"?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, 
configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table 
Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);

Review Comment:
   shouldn't there be a `"."` between `configPrefix` and the URI KEY?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -49,14 +52,9 @@
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
-  public static final String ICEBERG_CLUSTER_KEY = "cluster";
-  public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
-  public static final String ICEBERG_SRC_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
-  public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX 
+ ".source.cluster.name";
-  public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
-  public static final String ICEBERG_DEST_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
-  public static final String ICEBERG_DEST_CLUSTER_NAME = 
ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
+  public static final String ICEBERG_CATALOG_CLASS = "catalog.class";
+  public static final String ICEBERG_CATALOG_URI_KEY = "catalog.uri";

Review Comment:
   please document how these are used... i.e.:
   ```ICEBERG_CATALOG_PREFIX + "." + ("source" or "destination") + "..."```
   be sure to explain how this is an open-ended pattern that allows for passing 
arbitrary catalog-specific props through to the catalog.
   
   also, given that configs are essentially a global namespace, I'm not 
convinced that the prefix in use is discriminating enough.  
   
   whereas at present arbitrary properties go after:
   ```
   iceberg.dataset.source
   ```
   which means
   ```
   iceberg.dataset.destination.database
   ```
   would get passed through--and that seems wrong.
   
   instead, it might be more appropriate to put them after
   ```
   iceberg.dataset.source.catalog
   ```
   (or `iceberg.dataset.catalog.source`)
   
   seems most appropriate, given these are *catalog-specific* props



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -316,8 +316,8 @@ protected DatasetDescriptor 
getDestinationDataset(FileSystem targetFs) {
     return this.destIcebergTable.getDatasetDescriptor(targetFs);
   }
 
-  private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, 
IcebergTable dstIcebergTable) {
-    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+  private PostPublishStep createPostPublishStep(String dbName, String 
inputTableName, Properties properties) {

Review Comment:
   seems inappropriate for the general case to serialize every single global 
property here.  can't we constrain only to those actually related to creating 
the iceberg catalogs, source and dest?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String 
dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, 
prefix);
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster 
specific properties
-        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties 
properties, String configPrefix) {

Review Comment:
   the semantics here seem non-obvious, hence deserving of javadoc.  the name 
seems a bit inexact as well.  maybe `buildMapFromPrefixChildren`?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 856304)
    Time Spent: 1h  (was: 50m)

> Fix Iceberg Registration Serialization
> --------------------------------------
>
>                 Key: GOBBLIN-1811
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1811
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to