twalthr commented on a change in pull request #11959: URL: https://github.com/apache/flink/pull/11959#discussion_r419917599
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java ########## @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.ScanFormat; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.utils.EncodingUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility for working with {@link Factory}s. + */ +@Internal +public final class FactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); + + public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version") + .intType() + .defaultValue(1) + .withDescription( + "Version of the overall property design. This option is meant for future backwards compatibility."); + + public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector") + .stringType() + .noDefaultValue() + .withDescription( + "Uniquely identifies the connector of a dynamic table that is used for accessing data in " + + "an external system. Its value is used during table source and table sink discovery."); + + public static final String FORMAT_PREFIX = "format."; + + public static final String KEY_FORMAT_PREFIX = "key.format."; + + public static final String VALUE_FORMAT_PREFIX = "value.format."; + + /** + * Creates a {@link DynamicTableSource} from a {@link CatalogTable}. + * + * <p>It considers {@link Catalog#getFactory()} if provided. + */ + public static DynamicTableSource createTableSource( + @Nullable Catalog catalog, + ObjectIdentifier objectIdentifier, + CatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader) { + final DefaultDynamicTableContext context = new DefaultDynamicTableContext( + objectIdentifier, + catalogTable, + configuration, + classLoader); + try { + final DynamicTableSourceFactory factory = getDynamicTableFactory( + DynamicTableSourceFactory.class, + catalog, + context); + return factory.createDynamicTableSource(context); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Unable to create a source for reading table '%s'.\n\n" + + "Table options are:\n\n" + + "%s", + objectIdentifier.asSummaryString(), + catalogTable.getOptions() + .entrySet() + .stream() + .map(e -> stringifyOption(e.getKey(), e.getValue())) + .sorted() + .collect(Collectors.joining("\n"))), + t); + } + } + + /** + * Creates a {@link DynamicTableSink} from a {@link CatalogTable}. + * + * <p>It considers {@link Catalog#getFactory()} if provided. + */ + public static DynamicTableSink createTableSink( + @Nullable Catalog catalog, + ObjectIdentifier objectIdentifier, + CatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader) { + final DefaultDynamicTableContext context = new DefaultDynamicTableContext( + objectIdentifier, + catalogTable, + configuration, + classLoader); + try { + final DynamicTableSinkFactory factory = getDynamicTableFactory( + DynamicTableSinkFactory.class, + catalog, + context); + return factory.createDynamicTableSink(context); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Unable to create a sink for writing table '%s'.\n\n" + + "Table options are:\n\n" + + "%s", + objectIdentifier.asSummaryString(), + catalogTable.getOptions() + .entrySet() + .stream() + .map(e -> stringifyOption(e.getKey(), e.getValue())) + .sorted() + .collect(Collectors.joining("\n"))), + t); + } + } + + /** + * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}. + * + * <p>The following example sketches the usage: + * <pre>{@code + * // in createDynamicTableSource() + * helper = FactoryUtil.createTableFactoryHelper(this, context); + * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix"); + * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix"); + * helper.validate(); + * ... // construct connector with discovered formats + * }</pre> + * + * <p>Note: This utility checks for left-over options in the final step. + */ + public static TableFactoryHelper createTableFactoryHelper( + DynamicTableFactory factory, + DynamicTableFactory.Context context) { + return new TableFactoryHelper(factory, context); + } + + /** + * Discovers a factory using the given factory base class and identifier. + * + * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)} + * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}, + * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)} + * are not applicable. + */ + @SuppressWarnings("unchecked") + public static <T extends Factory> T discoverFactory( + ClassLoader classLoader, + Class<T> factoryClass, + String factoryIdentifier) { + final List<Factory> factories = discoverFactories(classLoader); + + final List<Factory> foundFactories = factories.stream() + .filter(f -> factoryClass.isAssignableFrom(f.getClass())) + .collect(Collectors.toList()); + + if (foundFactories.isEmpty()) { + throw new ValidationException( + String.format( + "Could not find any factories that implement '%s' in the classpath.", + factoryClass.getName())); + } + + final List<Factory> matchingFactories = foundFactories.stream() + .filter(f -> f.factoryIdentifier().equals(factoryIdentifier)) + .collect(Collectors.toList()); + + if (matchingFactories.isEmpty()) { + throw new ValidationException( + String.format( + "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" + + "Available factory identifiers are:\n\n" + + "%s", + factoryIdentifier, + factoryClass.getName(), + foundFactories.stream() + .map(Factory::factoryIdentifier) + .sorted() + .collect(Collectors.joining("\n")))); + } + if (matchingFactories.size() > 1) { + throw new ValidationException( + String.format( + "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" + + "Ambiguous factory classes are:\n\n" + + "%s", + factoryIdentifier, + factoryClass.getName(), + foundFactories.stream() + .map(f -> factories.getClass().getName()) + .sorted() + .collect(Collectors.joining("\n")))); + } + + return (T) matchingFactories.get(0); + } + + /** + * Validates the required and optional {@link ConfigOption}s of a factory. + * + * <p>Note: It does not check for left-over options. + */ + public static void validateFactoryOptions(Factory factory, ReadableConfig options) { + // currently Flink's options have no validation feature which is why we access them eagerly + // to provoke a parsing error + factory.requiredOptions() + .forEach(option -> { Review comment: good idea, better than a couple of trial and errors ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
