pvary commented on a change in pull request #2129:
URL: https://github.com/apache/iceberg/pull/2129#discussion_r564343558
##########
File path: mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
##########
@@ -180,47 +189,87 @@ public static boolean dropTable(Configuration conf,
Properties props) {
/**
* Returns true if HiveCatalog is used
* @param conf a Hadoop conf
+ * @param props the controlling properties
* @return true if the Catalog is HiveCatalog
*/
- public static boolean hiveCatalog(Configuration conf) {
- return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+ public static boolean hiveCatalog(Configuration conf, Properties props) {
+ String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
+ return HIVE.equalsIgnoreCase(getCatalogType(conf, catalogName));
}
@VisibleForTesting
- static Optional<Catalog> loadCatalog(Configuration conf) {
- String catalogLoaderClass =
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
- if (catalogLoaderClass != null) {
- CatalogLoader loader = (CatalogLoader)
DynConstructors.builder(CatalogLoader.class)
- .impl(catalogLoaderClass)
- .build()
- .newInstance();
- Catalog catalog = loader.load(conf);
- LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
- return Optional.of(catalog);
- }
+ static Optional<Catalog> loadCatalog(Configuration conf, String catalogName)
{
+ String name = catalogName == null ? DEFAULT_CATALOG : catalogName;
+ String catalogType = getCatalogType(conf, name);
+ return loadCatalog(conf, name, catalogType);
+ }
- String catalogName = conf.get(InputFormatConfig.CATALOG);
+ private static Optional<Catalog> loadCatalog(Configuration conf, String
catalogName, String catalogType) {
+ if (catalogType == null) {
+ LOG.info("Catalog is not configured");
+ return Optional.empty();
+ }
- if (catalogName != null) {
- Catalog catalog;
- switch (catalogName.toLowerCase()) {
- case HADOOP:
+ Map<String, String> properties = getCatalogProperties(conf, catalogName);
+ Catalog catalog;
+ switch (catalogType.toLowerCase()) {
+ case HADOOP:
+ if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+ catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(),
catalogName, properties, conf);
+ } else {
String warehouseLocation =
conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
-
catalog = (warehouseLocation != null) ? new HadoopCatalog(conf,
warehouseLocation) : new HadoopCatalog(conf);
- LOG.info("Loaded Hadoop catalog {}", catalog);
+ }
+ LOG.info("Loaded Hadoop catalog {}", catalog);
+ return Optional.of(catalog);
+ case HIVE:
+ catalog = HiveCatalogs.loadCatalog(catalogName, properties, conf);
+ LOG.info("Loaded Hive Metastore catalog {}", catalog);
+ return Optional.of(catalog);
+ case CUSTOM:
+ if (properties.containsKey(CatalogProperties.CLASS)) {
+ String catalogLoaderClass = properties.get(CatalogProperties.CLASS);
+ catalog = CatalogUtil.loadCatalog(catalogLoaderClass, catalogName,
properties, conf);
+ LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
return Optional.of(catalog);
- case HIVE:
- catalog = HiveCatalogs.loadCatalog(conf);
- LOG.info("Loaded Hive Metastore catalog {}", catalog);
+ } else if (conf.get(InputFormatConfig.CATALOG_LOADER_CLASS) != null) {
+ String catalogLoaderClass =
conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
+ CatalogLoader loader = (CatalogLoader)
DynConstructors.builder(CatalogLoader.class)
+ .impl(catalogLoaderClass)
+ .build()
+ .newInstance();
+ catalog = loader.load(conf);
+ LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
return Optional.of(catalog);
- default:
- throw new NoSuchNamespaceException("Catalog %s is not supported.",
catalogName);
- }
+ } else {
+ return Optional.empty();
+ }
+ default:
+ throw new NoSuchNamespaceException("Catalog %s is not supported.",
catalogType);
}
+ }
+
+ private static Map<String, String> getCatalogProperties(Configuration conf,
String catalogName) {
+ Map<String, String> properties = new HashMap<>();
+ conf.iterator().forEachRemaining(e -> {
Review comment:
I am not sure about this because of performance reasons, but what do you
think about this:
```
Map<String, String> result = asStream(conf.iterator())
.filter(e -> e.getKey().startsWith(keyPrefix))
.collect(Collectors.toMap(
e -> e.getKey().substring(keyPrefix.length() + 1),
e -> e.getValue()));
```
With the following Iterator -> Stream method found on StackOverflow, or with
just directly calling the stuff
```
public class StreamUtils {
public static <T> Stream<T> asStream(Iterator<T> sourceIterator) {
return asStream(sourceIterator, false);
}
public static <T> Stream<T> asStream(Iterator<T> sourceIterator, boolean
parallel) {
Iterable<T> iterable = () -> sourceIterator;
return StreamSupport.stream(iterable.spliterator(), parallel);
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]