twalthr commented on a change in pull request #15245:
URL: https://github.com/apache/flink/pull/15245#discussion_r598480004
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -200,6 +209,59 @@ public static TableFactoryHelper createTableFactoryHelper(
return new TableFactoryHelper(factory, context);
}
+ /**
+ * Attempts to discover an appropriate catalog factory and creates an
instance of the catalog.
+ *
+ * <p>This first uses the legacy {@link TableFactory} stack to discover a
matching {@link
+ * CatalogFactory}. If none is found, it falls back to the new stack using
{@link Factory}
+ * instead.
+ */
+ public static Catalog createCatalog(
+ String catalogName,
+ Map<String, String> options,
+ ReadableConfig configuration,
+ ClassLoader classLoader) {
+ // Use the legacy mechanism first for compatibility
+ try {
+ final CatalogFactory legacyFactory =
+ TableFactoryService.find(CatalogFactory.class, options,
classLoader);
+ return legacyFactory.createCatalog(catalogName, options);
+ } catch (NoMatchingTableFactoryException e) {
+ // No matching legacy factory found, try using the new stack
+
+ final DefaultCatalogContext discoveryContext =
+ new DefaultCatalogContext(catalogName, options,
configuration, classLoader);
+ try {
+ final CatalogFactory factory =
getCatalogFactory(discoveryContext);
+
+ // The type option is only used for discovery, we don't
actually want to forward it
+ // to the catalog factory itself.
+ final Map<String, String> factoryOptions =
+ options.entrySet().stream()
+ .filter(entry ->
!CATALOG_TYPE.key().equals(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ final DefaultCatalogContext context =
+ new DefaultCatalogContext(
+ catalogName, factoryOptions, configuration,
classLoader);
+
+ return factory.createCatalog(context);
+ } catch (Throwable t) {
+ throw new ValidationException(
Review comment:
propagate the cause `t`
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryOptions.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+
+/** {@link ConfigOption}s for {@link HiveCatalog}. */
+public class HiveCatalogFactoryOptions {
Review comment:
Annotate with `@Internal`, make final and add private constructor.
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java
##########
@@ -16,15 +16,18 @@
* limitations under the License.
*/
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.catalog;
-/** Validator for {@link GenericInMemoryCatalogDescriptor}. */
-public class GenericInMemoryCatalogValidator extends
CatalogDescriptorValidator {
- public static final String CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY =
"generic_in_memory";
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
- @Override
- public void validate(DescriptorProperties properties) {
- super.validate(properties);
- properties.validateValue(CATALOG_TYPE,
CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, false);
- }
+/** {@link ConfigOption}s for {@link GenericInMemoryCatalog}. */
+public class GenericInMemoryCatalogFactoryOptions {
Review comment:
annotate this with either `@Internal` (preferred) or `@PublicEvolving`
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
##########
@@ -18,73 +18,88 @@
package org.apache.flink.connector.jdbc.catalog.factory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.JdbcCatalogValidator;
import org.apache.flink.table.factories.CatalogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
-import static
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_BASE_URL;
-import static
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_PASSWORD;
-import static
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_JDBC_USERNAME;
-import static
org.apache.flink.table.descriptors.JdbcCatalogValidator.CATALOG_TYPE_VALUE_JDBC;
+import static
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.BASE_URL;
+import static
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.DEFAULT_DATABASE;
+import static
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.PASSWORD;
+import static
org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactoryOptions.USERNAME;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
/** Factory for {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {
private static final Logger LOG =
LoggerFactory.getLogger(JdbcCatalogFactory.class);
@Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = new HashMap<>();
- context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_JDBC); // jdbc
- context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
- return context;
+ public String factoryIdentifier() {
+ return JdbcCatalogFactoryOptions.IDENTIFIER;
}
@Override
- public List<String> supportedProperties() {
- List<String> properties = new ArrayList<>();
-
- // default database
- properties.add(CATALOG_DEFAULT_DATABASE);
-
- properties.add(CATALOG_JDBC_BASE_URL);
- properties.add(CATALOG_JDBC_USERNAME);
- properties.add(CATALOG_JDBC_PASSWORD);
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DEFAULT_DATABASE);
+ options.add(USERNAME);
+ options.add(PASSWORD);
+ options.add(BASE_URL);
+ return options;
+ }
- return properties;
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROPERTY_VERSION);
Review comment:
we should ignore `PROPERTY_VERSION` for all catalogs by default in the
`FactoryUtil`. we do the same for connectors.
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
##########
@@ -18,92 +18,88 @@
package org.apache.flink.table.catalog.hive.factories;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.CatalogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HADOOP_CONF_DIR;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_CONF_DIR;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_VERSION;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.DEFAULT_DATABASE;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HADOOP_CONF_DIR;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
/** Catalog factory for {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {
private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalogFactory.class);
@Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = new HashMap<>();
- context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE); // hive
- context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
- return context;
+ public String factoryIdentifier() {
+ return HiveCatalogFactoryOptions.IDENTIFIER;
}
@Override
- public List<String> supportedProperties() {
- List<String> properties = new ArrayList<>();
-
- // default database
- properties.add(CATALOG_DEFAULT_DATABASE);
-
- properties.add(CATALOG_HIVE_CONF_DIR);
-
- properties.add(CATALOG_HIVE_VERSION);
-
- properties.add(CATALOG_HADOOP_CONF_DIR);
-
- return properties;
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
}
@Override
- public Catalog createCatalog(String name, Map<String, String> properties) {
- final DescriptorProperties descriptorProperties =
getValidatedProperties(properties);
-
- final String defaultDatabase =
- descriptorProperties
- .getOptionalString(CATALOG_DEFAULT_DATABASE)
- .orElse(HiveCatalog.DEFAULT_DB);
-
- final Optional<String> hiveConfDir =
- descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR);
-
- final Optional<String> hadoopConfDir =
-
descriptorProperties.getOptionalString(CATALOG_HADOOP_CONF_DIR);
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DEFAULT_DATABASE);
+ options.add(PROPERTY_VERSION);
+ options.add(HIVE_CONF_DIR);
+ options.add(HIVE_VERSION);
+ options.add(HADOOP_CONF_DIR);
+ return options;
+ }
- final String version =
- descriptorProperties
- .getOptionalString(CATALOG_HIVE_VERSION)
- .orElse(HiveShimLoader.getHiveVersion());
+ @Override
+ public Catalog createCatalog(Context context) {
+ final Configuration configuration =
Configuration.fromMap(context.getOptions());
+ validateConfiguration(configuration);
return new HiveCatalog(
- name,
- defaultDatabase,
- hiveConfDir.orElse(null),
- hadoopConfDir.orElse(null),
- version);
+ context.getName(),
+ configuration.getString(DEFAULT_DATABASE),
+ configuration.getString(HIVE_CONF_DIR),
+ configuration.getString(HADOOP_CONF_DIR),
+ configuration.getString(HIVE_VERSION));
}
- private static DescriptorProperties getValidatedProperties(Map<String,
String> properties) {
- final DescriptorProperties descriptorProperties = new
DescriptorProperties(true);
- descriptorProperties.putProperties(properties);
-
- new HiveCatalogValidator().validate(descriptorProperties);
-
- return descriptorProperties;
+ private void validateConfiguration(Configuration configuration) {
+ final String defaultDatabase =
configuration.getString(DEFAULT_DATABASE);
+ if (defaultDatabase != null && defaultDatabase.isEmpty()) {
Review comment:
we could think about a little utility similar to `TableFactoryHelper` to
quickly validate required/optional properties that we can provide for all
catalogs
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
##########
@@ -18,92 +18,88 @@
package org.apache.flink.table.catalog.hive.factories;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.hive.HiveCatalog;
-import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.CatalogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HADOOP_CONF_DIR;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_CONF_DIR;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_HIVE_VERSION;
-import static
org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator.CATALOG_TYPE_VALUE_HIVE;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_DEFAULT_DATABASE;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
-import static
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.DEFAULT_DATABASE;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HADOOP_CONF_DIR;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_CONF_DIR;
+import static
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions.HIVE_VERSION;
+import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
/** Catalog factory for {@link HiveCatalog}. */
public class HiveCatalogFactory implements CatalogFactory {
private static final Logger LOG =
LoggerFactory.getLogger(HiveCatalogFactory.class);
@Override
- public Map<String, String> requiredContext() {
- Map<String, String> context = new HashMap<>();
- context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_HIVE); // hive
- context.put(CATALOG_PROPERTY_VERSION, "1"); // backwards compatibility
- return context;
+ public String factoryIdentifier() {
+ return HiveCatalogFactoryOptions.IDENTIFIER;
}
@Override
- public List<String> supportedProperties() {
- List<String> properties = new ArrayList<>();
-
- // default database
- properties.add(CATALOG_DEFAULT_DATABASE);
-
- properties.add(CATALOG_HIVE_CONF_DIR);
-
- properties.add(CATALOG_HIVE_VERSION);
-
- properties.add(CATALOG_HADOOP_CONF_DIR);
-
- return properties;
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
}
@Override
- public Catalog createCatalog(String name, Map<String, String> properties) {
- final DescriptorProperties descriptorProperties =
getValidatedProperties(properties);
-
- final String defaultDatabase =
- descriptorProperties
- .getOptionalString(CATALOG_DEFAULT_DATABASE)
- .orElse(HiveCatalog.DEFAULT_DB);
-
- final Optional<String> hiveConfDir =
- descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR);
-
- final Optional<String> hadoopConfDir =
-
descriptorProperties.getOptionalString(CATALOG_HADOOP_CONF_DIR);
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DEFAULT_DATABASE);
+ options.add(PROPERTY_VERSION);
+ options.add(HIVE_CONF_DIR);
+ options.add(HIVE_VERSION);
+ options.add(HADOOP_CONF_DIR);
+ return options;
+ }
- final String version =
- descriptorProperties
- .getOptionalString(CATALOG_HIVE_VERSION)
- .orElse(HiveShimLoader.getHiveVersion());
+ @Override
+ public Catalog createCatalog(Context context) {
+ final Configuration configuration =
Configuration.fromMap(context.getOptions());
+ validateConfiguration(configuration);
return new HiveCatalog(
- name,
- defaultDatabase,
- hiveConfDir.orElse(null),
- hadoopConfDir.orElse(null),
- version);
+ context.getName(),
+ configuration.getString(DEFAULT_DATABASE),
+ configuration.getString(HIVE_CONF_DIR),
+ configuration.getString(HADOOP_CONF_DIR),
+ configuration.getString(HIVE_VERSION));
}
- private static DescriptorProperties getValidatedProperties(Map<String,
String> properties) {
- final DescriptorProperties descriptorProperties = new
DescriptorProperties(true);
- descriptorProperties.putProperties(properties);
-
- new HiveCatalogValidator().validate(descriptorProperties);
-
- return descriptorProperties;
+ private void validateConfiguration(Configuration configuration) {
+ final String defaultDatabase =
configuration.getString(DEFAULT_DATABASE);
+ if (defaultDatabase != null && defaultDatabase.isEmpty()) {
Review comment:
reduce code duplication, btw there is also
`StringUtils.isNullOrWhitespaceOnly`
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalogFactoryOptions.java
##########
@@ -16,15 +16,18 @@
* limitations under the License.
*/
-package org.apache.flink.table.descriptors;
+package org.apache.flink.table.catalog;
-/** Validator for {@link GenericInMemoryCatalogDescriptor}. */
-public class GenericInMemoryCatalogValidator extends
CatalogDescriptorValidator {
- public static final String CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY =
"generic_in_memory";
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
- @Override
- public void validate(DescriptorProperties properties) {
- super.validate(properties);
- properties.validateValue(CATALOG_TYPE,
CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY, false);
- }
+/** {@link ConfigOption}s for {@link GenericInMemoryCatalog}. */
+public class GenericInMemoryCatalogFactoryOptions {
+
+ public static final String IDENTIFIER = "generic_in_memory";
Review comment:
actually this identifier violates the identifier guidelines :(
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]