rdblue commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r516376977
##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io,
Set<ManifestFile> allManifests) {
}
});
}
+
+ /**
+ * Load a custom catalog implementation.
+ * The catalog must have a no-arg constructor.
+ * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+ * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)}
is called to set Hadoop configuration.
+ * {@code Catalog.initialize(String name, Map<String, String> options)} is
called to complete the initialization.
+ * @param catalogName catalog name
+ * @param impl catalog implementation full class name
+ * @param engineOptions configuration options from a compute engine like
Spark or Flink to initialize the catalog
+ * @param hadoopConf hadoop configuration if needed
+ * @return initialized catalog object
+ * @throws IllegalArgumentException if no-arg constructor not found or error
during initialization
+ */
+ public static Catalog loadCustomCatalog(
+ String catalogName,
+ String impl,
Review comment:
Nit: I would expect `impl` to come first to keep the configuration
(name, options) together.
##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io,
Set<ManifestFile> allManifests) {
}
});
}
+
+ /**
+ * Load a custom catalog implementation.
+ * The catalog must have a no-arg constructor.
+ * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+ * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)}
is called to set Hadoop configuration.
+ * {@code Catalog.initialize(String name, Map<String, String> options)} is
called to complete the initialization.
+ * @param catalogName catalog name
+ * @param impl catalog implementation full class name
+ * @param engineOptions configuration options from a compute engine like
Spark or Flink to initialize the catalog
+ * @param hadoopConf hadoop configuration if needed
+ * @return initialized catalog object
+ * @throws IllegalArgumentException if no-arg constructor not found or error
during initialization
+ */
+ public static Catalog loadCustomCatalog(
+ String catalogName,
+ String impl,
+ Map<String, String> engineOptions,
+ Configuration hadoopConf) {
+ Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog because
impl property is not set");
+ DynConstructors.Ctor<Catalog> ctor;
+ try {
+ ctor = DynConstructors.builder(Catalog.class)
+ .impl(impl)
+ .buildChecked();
Review comment:
This may all fit on one line now.
##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io,
Set<ManifestFile> allManifests) {
}
});
}
+
+ /**
+ * Load a custom catalog implementation.
+ * The catalog must have a no-arg constructor.
+ * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+ * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)}
is called to set Hadoop configuration.
+ * {@code Catalog.initialize(String name, Map<String, String> options)} is
called to complete the initialization.
+ * @param catalogName catalog name
+ * @param impl catalog implementation full class name
+ * @param engineOptions configuration options from a compute engine like
Spark or Flink to initialize the catalog
+ * @param hadoopConf hadoop configuration if needed
+ * @return initialized catalog object
+ * @throws IllegalArgumentException if no-arg constructor not found or error
during initialization
+ */
+ public static Catalog loadCustomCatalog(
+ String catalogName,
+ String impl,
+ Map<String, String> engineOptions,
+ Configuration hadoopConf) {
+ Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog because
impl property is not set");
+ DynConstructors.Ctor<Catalog> ctor;
+ try {
+ ctor = DynConstructors.builder(Catalog.class)
+ .impl(impl)
+ .buildChecked();
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot initialize Catalog, please make sure %s has a no-arg
constructor", impl), e);
+ }
+ try {
+ Catalog catalog = ctor.newInstance();
+ if (catalog instanceof Configurable) {
+ ((Configurable) catalog).setConf(hadoopConf);
+ }
+ catalog.initialize(catalogName, engineOptions);
Review comment:
Can we move configuration out of the try/catch block? It doesn't need to
be there.
##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io,
Set<ManifestFile> allManifests) {
}
});
}
+
+ /**
+ * Load a custom catalog implementation.
+ * The catalog must have a no-arg constructor.
+ * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+ * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)}
is called to set Hadoop configuration.
+ * {@code Catalog.initialize(String name, Map<String, String> options)} is
called to complete the initialization.
+ * @param catalogName catalog name
+ * @param impl catalog implementation full class name
+ * @param engineOptions configuration options from a compute engine like
Spark or Flink to initialize the catalog
+ * @param hadoopConf hadoop configuration if needed
+ * @return initialized catalog object
+ * @throws IllegalArgumentException if no-arg constructor not found or error
during initialization
+ */
+ public static Catalog loadCustomCatalog(
+ String catalogName,
+ String impl,
+ Map<String, String> engineOptions,
Review comment:
The name `engineOptions` seems too specific because it assumes that the
caller is an engine. But it could be a user of the API that isn't an engine. I
think `config` or `properties` would be a better name.
##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io,
Set<ManifestFile> allManifests) {
}
});
}
+
+ /**
+ * Load a custom catalog implementation.
+ * The catalog must have a no-arg constructor.
+ * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+ * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)}
is called to set Hadoop configuration.
+ * {@code Catalog.initialize(String name, Map<String, String> options)} is
called to complete the initialization.
+ * @param catalogName catalog name
+ * @param impl catalog implementation full class name
+ * @param engineOptions configuration options from a compute engine like
Spark or Flink to initialize the catalog
+ * @param hadoopConf hadoop configuration if needed
+ * @return initialized catalog object
+ * @throws IllegalArgumentException if no-arg constructor not found or error
during initialization
+ */
+ public static Catalog loadCustomCatalog(
Review comment:
Minor: I tend to opt for removing words that aren't needed, so I would
remove "custom" from here. I think that's pretty much implied by loading an
implementation class.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -58,13 +59,19 @@
// Can not just use "type", it conflicts with CATALOG_TYPE.
public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+ public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+ public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+ public static final String ICEBERG_CATALOG_TYPE_CUSTOM = "custom";
Review comment:
Instead of using `type=custom` and `impl=com.example.Catalog`, why not
just combine them into `type=com.example.Catalog`. We can try to load the type
as an implementation class if it isn't a well-known name like "hive".
##########
File path:
spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -137,9 +138,11 @@ protected Table findTable(DataSourceOptions options,
Configuration conf) {
HadoopTables tables = new HadoopTables(conf);
return tables.load(path.get());
} else {
- HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
+ Catalog catalog = options.get("impl")
+ .map(impl -> CatalogUtil.loadCustomCatalog("custom", impl,
options.asMap(), conf))
+ .orElseGet(() -> HiveCatalogs.loadCatalog(conf));
TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
- return hiveCatalog.loadTable(tableIdentifier);
+ return catalog.loadTable(tableIdentifier);
Review comment:
I think we shouldn't change the behavior of `IcebergSource` in this PR.
We want to change how this source works and route queries through a catalog,
but I'm not sure that using `impl` is the right way to do it. Let's stick with
`HiveCatalogs` for now and revisit this in a follow up.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -58,13 +59,19 @@
// Can not just use "type", it conflicts with CATALOG_TYPE.
public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+ public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+ public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+ public static final String ICEBERG_CATALOG_TYPE_CUSTOM = "custom";
Review comment:
@aokolnychyi and @RussellSpitzer, do you have an opinion here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]