lcspinter commented on a change in pull request #2129:
URL: https://github.com/apache/iceberg/pull/2129#discussion_r563616810



##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";

Review comment:
       Fixed.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;

Review comment:
       Done.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";
     }
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+    // new logic

Review comment:
       Done.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";
     }
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+    // new logic
+      return loadCatalog(conf, catalogType, 
String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, name),
+              String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, 
name));
+    } else {
+      // old logic
+      // use catalog {@link InputFormatConfig.CATALOG} stored in global hive 
config if table specific catalog
+      // configuration or default catalog definition is missing
+      catalogType = conf.get(InputFormatConfig.CATALOG);
+      return loadCatalog(conf, catalogType, 
InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
+              InputFormatConfig.CATALOG_LOADER_CLASS);
+    }
+  }
 
-    if (catalogName != null) {
-      Catalog catalog;
-      switch (catalogName.toLowerCase()) {
-        case HADOOP:
-          String warehouseLocation = 
conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
+  private static Optional<Catalog> loadCatalog(Configuration conf, String 
catalogType,
+                                               String 
warehouseLocationConfigName, String loaderClassConfigName) {

Review comment:
       Good idea. I created a separate method to filter out the relevant 
properties and strip down the prefix.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
##########
@@ -73,6 +74,10 @@ private InputFormatConfig() {
   public static final String SNAPSHOT_TABLE = "iceberg.snapshots.table";
   public static final String SNAPSHOT_TABLE_SUFFIX = "__snapshots";
 
+  public static final String CATALOG_TYPE_TEMPLATE = CATALOG_NAME + ".%s.type";

Review comment:
       Done.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";
     }
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+    // new logic
+      return loadCatalog(conf, catalogType, 
String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, name),
+              String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, 
name));
+    } else {
+      // old logic
+      // use catalog {@link InputFormatConfig.CATALOG} stored in global hive 
config if table specific catalog
+      // configuration or default catalog definition is missing
+      catalogType = conf.get(InputFormatConfig.CATALOG);
+      return loadCatalog(conf, catalogType, 
InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
+              InputFormatConfig.CATALOG_LOADER_CLASS);
+    }
+  }
 
-    if (catalogName != null) {
-      Catalog catalog;
-      switch (catalogName.toLowerCase()) {
-        case HADOOP:
-          String warehouseLocation = 
conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
+  private static Optional<Catalog> loadCatalog(Configuration conf, String 
catalogType,
+                                               String 
warehouseLocationConfigName, String loaderClassConfigName) {
+    if (catalogType == null) {
+      LOG.info("Catalog is not configured");
+      return Optional.empty();
+    }
 
-          catalog = (warehouseLocation != null) ? new HadoopCatalog(conf, 
warehouseLocation) : new HadoopCatalog(conf);
-          LOG.info("Loaded Hadoop catalog {}", catalog);
+    Catalog catalog;
+    switch (catalogType.toLowerCase()) {
+      case HADOOP:
+        String warehouseLocation = conf.get(warehouseLocationConfigName);
+        catalog = (warehouseLocation != null) ? new HadoopCatalog(conf, 
warehouseLocation) :
+                new HadoopCatalog(conf);
+        LOG.info("Loaded Hadoop catalog {}", catalog);
+        return Optional.of(catalog);
+      case HIVE:
+        catalog = HiveCatalogs.loadCatalog(conf);
+        LOG.info("Loaded Hive Metastore catalog {}", catalog);
+        return Optional.of(catalog);
+      case CUSTOM:
+        String catalogLoaderClass = conf.get(loaderClassConfigName);
+        if (catalogLoaderClass != null) {
+          CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)

Review comment:
       Thanks for the review. This is a great idea, I was not aware of the 
CatalogUtil so far. I refactored to code to use this utility, but I kept the 
CatalogLoader as well since we still allow custom catalog configuration through 
the global hive config. 
   I will send out mail to the community, to discuss when would be the best 
time to remove the old catalog configuration logic. 

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));

Review comment:
       Good catch, fixed.

##########
File path: 
mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergStorageHandlerWithMultipleCatalogs {
+
+  private static final String[] EXECUTION_ENGINES = new String[] { "tez", "mr" 
};
+  private static final String HIVECATALOGNAME = "table1_catalog";
+  private static final String OTHERCATALOGNAME = "table2_catalog";
+  private static TestHiveShell shell;
+
+  @Parameterized.Parameter(0)
+  public FileFormat fileFormat1;
+  @Parameterized.Parameter(1)
+  public FileFormat fileFormat2;
+  @Parameterized.Parameter(2)
+  public String executionEngine;
+  @Parameterized.Parameter(3)
+  public TestTables.TestTableType testTableType1;
+  @Parameterized.Parameter(4)
+  public String table1CatalogName;
+  @Parameterized.Parameter(5)
+  public TestTables.TestTableType testTableType2;
+  @Parameterized.Parameter(6)
+  public String table2CatalogName;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private TestTables testTables1;
+  private TestTables testTables2;
+
+  @Parameterized.Parameters(name = "fileFormat={0}, fileFormat={1}, 
engine={2}, tableType1={3}, catalogName1={4}, " +

Review comment:
       Yes, typo. 

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +186,80 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
catalogName)));
+    } else {
+      if 
(HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
"default")))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName;
+    if (name == null) {
+      name = "default";
     }
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+    // new logic
+      return loadCatalog(conf, catalogType, 
String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, name),
+              String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, 
name));
+    } else {
+      // old logic
+      // use catalog {@link InputFormatConfig.CATALOG} stored in global hive 
config if table specific catalog
+      // configuration or default catalog definition is missing

Review comment:
       Fixed.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +189,97 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 catalogName)));
+    } else {
+      if 
(HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 DEFAULT_CATALOG)))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = 
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) 
DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) 
{
+    String catalogType;
+    String name = catalogName == null ? DEFAULT_CATALOG : catalogName;
+    catalogType = 
conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
+
+    // keep both catalog configuration methods for seamless transition
+    if (catalogType != null) {
+      // new logic
+      LOG.debug("Using catalog configuration from table properties.");
+      return loadCatalog(conf, name, catalogType);
+    } else {
+      // old logic
+      // use catalog {@link InputFormatConfig.CATALOG} stored in global hive 
config if table specific catalog
+      // configuration or default catalog definition is missing
+      LOG.debug("Using catalog configuration from global configuration.");
+      catalogType = conf.get(InputFormatConfig.CATALOG);
+      return loadCatalog(conf, name, catalogType);
     }
+  }
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+  private static Optional<Catalog> loadCatalog(Configuration conf, String 
catalogName, String catalogType) {
+    if (catalogType == null) {
+      LOG.info("Catalog is not configured");
+      return Optional.empty();
+    }
 
-    if (catalogName != null) {
-      Catalog catalog;
-      switch (catalogName.toLowerCase()) {
-        case HADOOP:
+    Map<String, String> properties = getCatalogProperties(conf, catalogName);
+    Catalog catalog;
+    switch (catalogType.toLowerCase()) {
+      case HADOOP:
+        if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+          catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), 
catalogName,
+                  getCatalogProperties(conf, catalogName), conf);

Review comment:
       Right, I missed that.

##########
File path: 
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java
##########
@@ -36,4 +38,11 @@ public static HiveCatalog loadCatalog(Configuration conf) {
     String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, 
"");
     return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf));
   }
+
+  public static HiveCatalog loadCatalog(String catalogName, Map<String, 
String> properties, Configuration conf) {
+    // metastore URI can be null in local mode
+    String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, 
"");
+    return CATALOG_CACHE.get(metastoreUri, uri -> (HiveCatalog) 
CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
+            catalogName, properties, conf));
+  }

Review comment:
       I like the idea of having the catalogName as key. It's much cleaner than 
having the metastore uri and it's easier to track back. 
   The loadCatalog(conf) method was only used from the TestTables, so I 
extended it with an additional catalogName param. 

##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +189,97 @@ public static boolean dropTable(Configuration conf, 
Properties props) {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+    if (catalogName != null) {
+      return 
HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 catalogName)));
+    } else {
+      if 
(HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE,
 DEFAULT_CATALOG)))) {
+        return true;
+      } else {
+        return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+      }
+    }

Review comment:
       Correct observation, as always :). Refactored it.

##########
File path: mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
##########
@@ -138,9 +142,13 @@ public void testCreateDropTableToLocation() throws 
IOException {
   @Test
   public void testCreateDropTableToCatalog() throws IOException {
     TableIdentifier identifier = TableIdentifier.of("test", "table");
+    String defaultCatalogName = "default";
+    String warehouseLocation = temp.newFolder("hadoop", 
"warehouse").toString();
 
-    conf.set("warehouse.location", temp.newFolder("hadoop", 
"warehouse").toString());
-    conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, 
CustomHadoopCatalogLoader.class, CatalogLoader.class);
+    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, 
defaultCatalogName), warehouseLocation);
+    conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, 
defaultCatalogName),
+            CustomHadoopCatalog.class.getName());
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, 
defaultCatalogName), Catalogs.CUSTOM);

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to