kbendick commented on a change in pull request #2629:
URL: https://github.com/apache/iceberg/pull/2629#discussion_r698745695
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -100,21 +100,37 @@ protected CatalogLoader createCatalogLoader(String name,
Map<String, String> pro
}
@Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = Maps.newHashMap();
- context.put(CatalogDescriptorValidator.CATALOG_TYPE, "iceberg");
- context.put(CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION, "1");
- return context;
+ public String factoryIdentifier() {
+ return FlinkCatalogFactoryOptions.IDENTIFIER;
}
@Override
- public List<String> supportedProperties() {
- return ImmutableList.of("*");
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FlinkCatalogFactoryOptions.CATALOG_TYPE);
+ return options;
}
@Override
- public Catalog createCatalog(String name, Map<String, String> properties) {
- return createCatalog(name, properties, clusterHadoopConf());
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FlinkCatalogFactoryOptions.PROPERTY_VERSION);
+ options.add(FlinkCatalogFactoryOptions.URI);
+ options.add(FlinkCatalogFactoryOptions.WAREHOUSE);
+ options.add(FlinkCatalogFactoryOptions.CLIENTS);
+ options.add(FlinkCatalogFactoryOptions.BASE_NAMESPACE);
+ options.add(FlinkCatalogFactoryOptions.HIVE_CONF_DIF);
+ options.add(FlinkCatalogFactoryOptions.CACHE_ENABLED);
+ return options;
+ }
Review comment:
Do we need to add any of the known additional catalog properties that
are used with some of the AWS catalogs?
Also, for catalogs that offer additional options (including custom
catalogs), how will users set those? Presently, `supportedOptions` has a `*`.
Is that still possible? The usage of `*` would be the easiest path forward, but
that might not be possible.
If not, the ones I can think of are presently coming from the AWS catalogs
(though we should also look into the JDBC catalog as well).
**Additional Catalog Properties To Consider**:
_General Catalog Properties_
- `catalog-impl`
- `io-impl`
[_GlueCatalog Specific
Options_](https://iceberg.apache.org/aws/#glue-catalog)
- `lock-impl`
- `lock.table`
[_DynamoDbCatalog Specific
Options_](https://iceberg.apache.org/aws/#dynamodb-catalog)
- `dynamodb.table-name`
[_AWS Client
Configuration_](https://iceberg.apache.org/aws/#aws-client-customization)
- `client.factory`
There are further ones which I'll try to link to in a bit (such as S3FileIO
options for AWS authentication and encryption parameters), but those are the
first ones I encountered in the docs.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -100,21 +100,37 @@ protected CatalogLoader createCatalogLoader(String name,
Map<String, String> pro
}
@Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = Maps.newHashMap();
- context.put(CatalogDescriptorValidator.CATALOG_TYPE, "iceberg");
- context.put(CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION, "1");
- return context;
+ public String factoryIdentifier() {
+ return FlinkCatalogFactoryOptions.IDENTIFIER;
}
@Override
- public List<String> supportedProperties() {
- return ImmutableList.of("*");
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FlinkCatalogFactoryOptions.CATALOG_TYPE);
+ return options;
}
@Override
- public Catalog createCatalog(String name, Map<String, String> properties) {
- return createCatalog(name, properties, clusterHadoopConf());
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FlinkCatalogFactoryOptions.PROPERTY_VERSION);
+ options.add(FlinkCatalogFactoryOptions.URI);
+ options.add(FlinkCatalogFactoryOptions.WAREHOUSE);
+ options.add(FlinkCatalogFactoryOptions.CLIENTS);
+ options.add(FlinkCatalogFactoryOptions.BASE_NAMESPACE);
+ options.add(FlinkCatalogFactoryOptions.HIVE_CONF_DIF);
+ options.add(FlinkCatalogFactoryOptions.CACHE_ENABLED);
+ return options;
+ }
+
+ @Override
+ public Catalog createCatalog(Context context) {
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtil.createCatalogFactoryHelper(this, context);
+ helper.validate();
+
+ return createCatalog(context.getName(), context.getOptions(),
clusterHadoopConf());
Review comment:
There is an effort to decouple the need for Hadoop from the
`FlinkCatalogFactory` for environments where Hadoop is not easily configurable
and for catalog implementations that don't actually need it (basically anything
that uses `S3FileIO` - though we'll need to update `GlueCatalog` and eventually
`DynamoDbCatalog` as well).
Even after updating `GlueCatalog` to remove the Hadoop `Configurable`
interface, this call to `clusterHadoopConf()` still makes it so that Hadoop is
needed on the class path.
I'm not proposing that we change that in this PR (as this PR has been open
for a while, and it's a separate concern), but I wanted to draw attention to
the issue as I'm a bit less informed on the Flink side compared to many of you:
https://github.com/apache/iceberg/issues/3044
##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -100,21 +100,37 @@ protected CatalogLoader createCatalogLoader(String name,
Map<String, String> pro
}
@Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = Maps.newHashMap();
- context.put(CatalogDescriptorValidator.CATALOG_TYPE, "iceberg");
- context.put(CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION, "1");
- return context;
+ public String factoryIdentifier() {
+ return FlinkCatalogFactoryOptions.IDENTIFIER;
}
@Override
- public List<String> supportedProperties() {
- return ImmutableList.of("*");
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FlinkCatalogFactoryOptions.CATALOG_TYPE);
+ return options;
}
@Override
- public Catalog createCatalog(String name, Map<String, String> properties) {
- return createCatalog(name, properties, clusterHadoopConf());
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FlinkCatalogFactoryOptions.PROPERTY_VERSION);
+ options.add(FlinkCatalogFactoryOptions.URI);
+ options.add(FlinkCatalogFactoryOptions.WAREHOUSE);
+ options.add(FlinkCatalogFactoryOptions.CLIENTS);
+ options.add(FlinkCatalogFactoryOptions.BASE_NAMESPACE);
+ options.add(FlinkCatalogFactoryOptions.HIVE_CONF_DIF);
+ options.add(FlinkCatalogFactoryOptions.CACHE_ENABLED);
+ return options;
+ }
Review comment:
Also, will users be able to specify their own catalog implementation
(with its own options) still?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]