pvary commented on a change in pull request #2129:
URL: https://github.com/apache/iceberg/pull/2129#discussion_r561891139



##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";

Review comment:
       should be a constant

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;

Review comment:
       maybe?
   ```
   String name = catalogName != null ? catalogName : DEFAULT_CATALOG;
   ```

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";
     }
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+    // new logic

Review comment:
       nit: formatting

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";
     }
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+    // new logic
+      return loadCatalog(conf, catalogType, 
String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, name),
+              String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, 
name));
+    } else {
+      // old logic
+      // use catalog {@link InputFormatConfig.CATALOG} stored in global hive 
config if table specific catalog
+      // configuration or default catalog definition is missing
+      catalogType = conf.get(InputFormatConfig.CATALOG);
+      return loadCatalog(conf, catalogType, 
InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
+              InputFormatConfig.CATALOG_LOADER_CLASS);
+    }
+  }
 
-    if (catalogName != null) {
-      Catalog catalog;
-      switch (catalogName.toLowerCase()) {
-        case HADOOP:
-          String warehouseLocation = 
conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
+  private static Optional<Catalog> loadCatalog(Configuration conf, String 
catalogType,
+                                               String 
warehouseLocationConfigName, String loaderClassConfigName) {

Review comment:
       Why do we get these config names as a parameter?
   My feeling is that the actual parameters should be the responsibility of the 
specific catalog implementations

##########
File path: mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
##########
@@ -73,6 +74,10 @@ private InputFormatConfig() {
   public static final String SNAPSHOT_TABLE = "iceberg.snapshots.table";
   public static final String SNAPSHOT_TABLE_SUFFIX = "__snapshots";
 
+  public static final String CATALOG_TYPE_TEMPLATE = CATALOG_NAME + ".%s.type";

Review comment:
       I think we reused here the CATALOG_NAME constant, which was supposed to 
be something else.
   
   Maybe `CATALOG_CONFIG_PREFIX_`?
   
   The value could be `iceberg.catalog.` or simply just omit?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";
     }
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+    // new logic
+      return loadCatalog(conf, catalogType, 
String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, name),
+              String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, 
name));
+    } else {
+      // old logic
+      // use catalog {@link InputFormatConfig.CATALOG} stored in global hive 
config if table specific catalog
+      // configuration or default catalog definition is missing
+      catalogType = conf.get(InputFormatConfig.CATALOG);
+      return loadCatalog(conf, catalogType, 
InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
+              InputFormatConfig.CATALOG_LOADER_CLASS);
+    }
+  }
 
-    if (catalogName != null) {
-      Catalog catalog;
-      switch (catalogName.toLowerCase()) {
-        case HADOOP:
-          String warehouseLocation = 
conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
+  private static Optional<Catalog> loadCatalog(Configuration conf, String 
catalogType,
+                                               String 
warehouseLocationConfigName, String loaderClassConfigName) {

Review comment:
       Here it would be good if we can filter out the properties relevant for 
this catalog based on the name and the prefix




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to