pvary commented on a change in pull request #2129:
URL: https://github.com/apache/iceberg/pull/2129#discussion_r563654993



##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +189,97 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 catalogName)));
+    } else {
+      if 
(HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 DEFAULT_CATALOG)))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName == null ? DEFAULT_CATALOG : catalogName;
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
+
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+      // new logic
+      LOG.debug("Using catalog configuration from table properties.");
+      return loadCatalog(conf, name, catalogType);
+    } else {
+      // old logic
+      // use catalog {@link InputFormatConfig.CATALOG} stored in global hive 
config if table specific catalog
+      // configuration or default catalog definition is missing
+      LOG.debug("Using catalog configuration from global configuration.");
+      catalogType = conf.get(InputFormatConfig.CATALOG);
+      return loadCatalog(conf, name, catalogType);
     }
+  }
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+  private static Optional<Catalog> loadCatalog(Configuration conf, String 
catalogName, String catalogType) {
+    if (catalogType == null) {
+      LOG.info("Catalog is not configured");
+      return Optional.empty();
+    }
 
-    if (catalogName != null) {
-      Catalog catalog;
-      switch (catalogName.toLowerCase()) {
-        case HADOOP:
+    Map<String, String> properties = getCatalogProperties(conf, catalogName);
+    Catalog catalog;
+    switch (catalogType.toLowerCase()) {
+      case HADOOP:
+        if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+          catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), 
catalogName,
+                  getCatalogProperties(conf, catalogName), conf);

Review comment:
       We do have this properties at hand already? Don't we?

##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java
##########
@@ -36,4 +38,11 @@ public static HiveCatalog loadCatalog(Configuration conf) {
     String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, 
"");
     return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf));
   }
+
+  public static HiveCatalog loadCatalog(String catalogName, Map<String, 
String> properties, Configuration conf) {
+    // metastore URI can be null in local mode
+    String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, 
"");
+    return CATALOG_CACHE.get(metastoreUri, uri -> (HiveCatalog) 
CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
+            catalogName, properties, conf));
+  }

Review comment:
       With the new Catalog system I think the key in the cache could be the 
catalogName since we expect it to be unique. I am not sure what to do with the 
original default behavior. Maybe the `default` name could be ok there as well.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +189,97 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 catalogName)));
+    } else {
+      if 
(HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 DEFAULT_CATALOG)))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }

Review comment:
       This starts to get 'complicated' and maybe it starts to worth to reuse 
in the loadCatalog.
   Your thoughts?

##########
File path: mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
##########
@@ -138,9 +142,13 @@ public void testCreateDropTableToLocation() throws 
IOException {
   @Test
   public void testCreateDropTableToCatalog() throws IOException {
     TableIdentifier identifier = TableIdentifier.of("test", "table");
+    String defaultCatalogName = "default";
+    String warehouseLocation = temp.newFolder("hadoop", 
"warehouse").toString();
 
-    conf.set("warehouse.location", temp.newFolder("hadoop", 
"warehouse").toString());
-    conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, 
CustomHadoopCatalogLoader.class, CatalogLoader.class);
+    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, 
defaultCatalogName), warehouseLocation);
+    conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, 
defaultCatalogName),
+            CustomHadoopCatalog.class.getName());
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
defaultCatalogName), Catalogs.CUSTOM);

Review comment:
       I see this code several times in the tests. Is it possible to refactor 
it to an util class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to