This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fef974d7df1a9dabd62f3288c086185d69298bc1 Author: Timo Walther <[email protected]> AuthorDate: Thu Apr 30 11:03:47 2020 +0200 [FLINK-16997][table-common] Add new factory interfaces and discovery utilities Implements the new factory interfaces mentioned in FLIP-95. Adds new factory utilities that can be used for FLIP-122 and future factories. It adds TestDynamicTableFactory and TestFormatFactory for reference implementations of new factories. This closes #11959. --- .../table/tests/test_catalog_completeness.py | 1 + .../org/apache/flink/table/catalog/Catalog.java | 20 + .../flink/table/connector/format/Format.java | 55 ++ .../flink/table/connector/format/ScanFormat.java | 37 ++ .../flink/table/connector/format/SinkFormat.java | 38 ++ .../factories/DeserializationFormatFactory.java | 34 ++ .../flink/table/factories/DynamicTableFactory.java | 73 +++ .../table/factories/DynamicTableSinkFactory.java | 42 ++ .../table/factories/DynamicTableSourceFactory.java | 42 ++ .../org/apache/flink/table/factories/Factory.java | 79 +++ .../apache/flink/table/factories/FactoryUtil.java | 570 +++++++++++++++++++++ .../flink/table/factories/ScanFormatFactory.java | 50 ++ .../factories/SerializationFormatFactory.java | 34 ++ .../flink/table/factories/SinkFormatFactory.java | 50 ++ .../flink/table/factories/FactoryUtilTest.java | 285 +++++++++++ .../table/factories/TestDynamicTableFactory.java | 257 ++++++++++ .../flink/table/factories/TestFormatFactory.java | 180 +++++++ .../org.apache.flink.table.factories.Factory | 17 + 18 files changed, 1864 insertions(+) diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py index 3e83357..245a2be 100644 --- a/flink-python/pyflink/table/tests/test_catalog_completeness.py +++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py @@ -43,6 +43,7 @@ class CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa return { 'open', 'close', + 'getFactory', 'getTableFactory', 'getFunctionDefinitionFactory', 'listPartitionsByFilter'} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index b4ff5f3..1e4c482 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.exceptions.TablePartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactory; @@ -49,11 +51,29 @@ import java.util.Optional; public interface Catalog { /** + * Returns a factory for creating instances from catalog objects. + * + * <p>This method enables bypassing the discovery process. Implementers can directly pass internal + * catalog-specific objects to their own factory. For example, a custom {@link CatalogTable} can + * be processed by a custom {@link DynamicTableFactory}. + * + * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement multiple + * supported extension points. An {@code instanceof} check is performed by the caller that checks + * whether a required factory is implemented; otherwise the discovery process is used. + */ + default Optional<Factory> getFactory() { + return Optional.empty(); + } + + /** * Get an optional {@link TableFactory} instance that's responsible for generating table-related * instances stored in this catalog, instances such as source/sink. * * @return an optional TableFactory instance + * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses the + * new table sources and sinks defined in FLIP-95 and a slightly different discovery mechanism. */ + @Deprecated default Optional<TableFactory> getTableFactory() { return Optional.empty(); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java new file mode 100644 index 0000000..40c0115 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.format; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.factories.DynamicTableFactory; + +/** + * Base interface for connector formats. + * + * <p>Depending on the kind of external system, a connector might support different encodings for + * reading and writing rows. This interface is an intermediate representation before constructing actual + * runtime implementation. + * + * <p>Formats can be distinguished along two dimensions: + * <ul> + * <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}). + * <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or + * some bulk interface).</li> + * </ul> + * + * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector. + * + * @see ScanFormat + * @see SinkFormat + */ +@PublicEvolving +public interface Format { + + /** + * Returns the set of changes that a connector (and transitively the planner) can expect during + * runtime. + */ + ChangelogMode getChangelogMode(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java new file mode 100644 index 0000000..529ea37 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.format; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.types.DataType; + +/** + * A {@link Format} for a {@link ScanTableSource}. + * + * @param <I> runtime interface needed by the table source + */ +@PublicEvolving +public interface ScanFormat<I> extends Format { + + /** + * Creates runtime implementation that is configured to produce data of the given data type. + */ + I createScanFormat(ScanTableSource.Context context, DataType producedDataType); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java new file mode 100644 index 0000000..0b67336 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.format; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.types.DataType; + +/** + * A {@link Format} for a {@link DynamicTableSink}. + * + * @param <I> runtime interface needed by the table sink + */ +@PublicEvolving +public interface SinkFormat<I> extends Format { + + /** + * Creates runtime implementation that is configured to consume data of the given data type. + */ + I createSinkFormat(ScanTableSource.Context context, DataType consumedDataType); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java new file mode 100644 index 0000000..1901fc5 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.table.connector.format.ScanFormat; +import org.apache.flink.table.data.RowData; + +/** + * Factory for creating a {@link ScanFormat} for {@link DeserializationSchema}. + * + * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context) + */ +@PublicEvolving +public interface DeserializationFormatFactory extends ScanFormatFactory<DeserializationSchema<RowData>> { + // interface is used for discovery but is already fully specified by the generics +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java new file mode 100644 index 0000000..0d4f8ba --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; + +/** + * Base interface for configuring a dynamic table connector for an external storage system from catalog + * and session information. + * + * <p>Dynamic tables are the core concept of Flink's Table & SQL API for processing both bounded and + * unbounded data in a unified fashion. + * + * <p>Implement {@link DynamicTableSourceFactory} for constructing a {@link DynamicTableSource}. + * + * <p>Implement {@link DynamicTableSinkFactory} for constructing a {@link DynamicTableSink}. + * + * <p>The options {@link FactoryUtil#PROPERTY_VERSION} and {@link FactoryUtil#CONNECTOR} are implicitly + * added and must not be declared. + */ +@PublicEvolving +public interface DynamicTableFactory extends Factory { + + /** + * Provides catalog and session information describing the dynamic table to be accessed. + */ + interface Context { + + /** + * Returns the identifier of the table in the {@link Catalog}. + */ + ObjectIdentifier getObjectIdentifier(); + + /** + * Returns table information received from the {@link Catalog}. + */ + CatalogTable getCatalogTable(); + + /** + * Gives read-only access to the configuration of the current session. + */ + ReadableConfig getConfiguration(); + + /** + * Returns the class loader of the current session. + * + * <p>The class loader is in particular useful for discovering further (nested) factories. + */ + ClassLoader getClassLoader(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java new file mode 100644 index 0000000..7f428fe --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.connector.sink.DynamicTableSink; + +/** + * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context + * information. + * + * <p>See {@link Factory} for more information about the general design of a factory. + */ +@PublicEvolving +public interface DynamicTableSinkFactory extends DynamicTableFactory { + + /** + * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context + * information. + * + * <p>An implementation should perform validation and the discovery of further (nested) factories + * in this method. + */ + DynamicTableSink createDynamicTableSink(Context context); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java new file mode 100644 index 0000000..3c0ecd1 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.connector.source.DynamicTableSource; + +/** + * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context + * information. + * + * <p>See {@link Factory} for more information about the general design of a factory. + */ +@PublicEvolving +public interface DynamicTableSourceFactory extends DynamicTableFactory { + + /** + * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context + * information. + * + * <p>An implementation should perform validation and the discovery of further (nested) factories + * in this method. + */ + DynamicTableSource createDynamicTableSource(Context context); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java new file mode 100644 index 0000000..b669e76 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; + +import java.util.Set; + +/** + * Base interface for all kind of factories that create object instances from a list of key-value pairs + * in Flink's Table & SQL API. + * + * <p>A factory is uniquely identified by {@link Class} and {@link #factoryIdentifier()}. + * + * <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI). Classes + * that implement this interface can be added to {@code META_INF/services/org.apache.flink.table.factories.Factory} + * in JAR files. + * + * <p>Every factory declares a set of required and optional options. This information will not be used + * during discovery but is helpful when generating documentation and performing validation. A factory may + * discover further (nested) factories, the options of the nested factories must not be declared in the + * sets of this factory. + * + * <p>It is the responsibility of each factory to perform validation before returning an instance. + * + * <p>For consistency, the following style for key names of {@link ConfigOption} is recommended: + * <ul> + * <li>Try to <b>reuse</b> key names as much as possible. Use other factory implementations as an example. + * <li>Key names should be declared in <b>lower case</b>. Use "-" instead of dots or camel case to split words. + * <li>Key names should be <b>hierarchical</b> where appropriate. Think about how one would define such + * a hierarchy in JSON or YAML file (e.g. {@code sink.bulk-flush.max-actions}). + * <li>In case of a hierarchy, try not to use the higher level again in the key name (e.g. do {@code sink.partitioner} + * instead of {@code sink.sink-partitioner}) to <b>keep the keys short</b>. + * </ul> + */ +@PublicEvolving +public interface Factory { + + /** + * Returns a unique identifier among same factory interfaces. + * + * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code kafka}). If + * multiple factories exist for different versions, a version should be appended using "-" (e.g. {@code kafka-0.10}). + */ + String factoryIdentifier(); + + /** + * Returns a set of {@link ConfigOption} that an implementation of this factory requires in addition to + * {@link #optionalOptions()}. + * + * <p>See the documentation of {@link Factory} for more information. + */ + Set<ConfigOption<?>> requiredOptions(); + + /** + * Returns a set of {@link ConfigOption} that an implementation of this factory consumes in addition to + * {@link #requiredOptions()}. + * + * <p>See the documentation of {@link Factory} for more information. + */ + Set<ConfigOption<?>> optionalOptions(); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java new file mode 100644 index 0000000..fa42b8f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.ScanFormat; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.utils.EncodingUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility for working with {@link Factory}s. + */ +@PublicEvolving +public final class FactoryUtil { + + private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class); + + public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version") + .intType() + .defaultValue(1) + .withDescription( + "Version of the overall property design. This option is meant for future backwards compatibility."); + + public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector") + .stringType() + .noDefaultValue() + .withDescription( + "Uniquely identifies the connector of a dynamic table that is used for accessing data in " + + "an external system. Its value is used during table source and table sink discovery."); + + public static final String FORMAT_PREFIX = "format."; + + public static final String KEY_FORMAT_PREFIX = "key.format."; + + public static final String VALUE_FORMAT_PREFIX = "value.format."; + + /** + * Creates a {@link DynamicTableSource} from a {@link CatalogTable}. + * + * <p>It considers {@link Catalog#getFactory()} if provided. + */ + public static DynamicTableSource createTableSource( + @Nullable Catalog catalog, + ObjectIdentifier objectIdentifier, + CatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader) { + final DefaultDynamicTableContext context = new DefaultDynamicTableContext( + objectIdentifier, + catalogTable, + configuration, + classLoader); + try { + final DynamicTableSourceFactory factory = getDynamicTableFactory( + DynamicTableSourceFactory.class, + catalog, + context); + return factory.createDynamicTableSource(context); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Unable to create a source for reading table '%s'.\n\n" + + "Table options are:\n\n" + + "%s", + objectIdentifier.asSummaryString(), + catalogTable.getOptions() + .entrySet() + .stream() + .map(e -> stringifyOption(e.getKey(), e.getValue())) + .sorted() + .collect(Collectors.joining("\n"))), + t); + } + } + + /** + * Creates a {@link DynamicTableSink} from a {@link CatalogTable}. + * + * <p>It considers {@link Catalog#getFactory()} if provided. + */ + public static DynamicTableSink createTableSink( + @Nullable Catalog catalog, + ObjectIdentifier objectIdentifier, + CatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader) { + final DefaultDynamicTableContext context = new DefaultDynamicTableContext( + objectIdentifier, + catalogTable, + configuration, + classLoader); + try { + final DynamicTableSinkFactory factory = getDynamicTableFactory( + DynamicTableSinkFactory.class, + catalog, + context); + return factory.createDynamicTableSink(context); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Unable to create a sink for writing table '%s'.\n\n" + + "Table options are:\n\n" + + "%s", + objectIdentifier.asSummaryString(), + catalogTable.getOptions() + .entrySet() + .stream() + .map(e -> stringifyOption(e.getKey(), e.getValue())) + .sorted() + .collect(Collectors.joining("\n"))), + t); + } + } + + /** + * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}. + * + * <p>The following example sketches the usage: + * <pre>{@code + * // in createDynamicTableSource() + * helper = FactoryUtil.createTableFactoryHelper(this, context); + * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix"); + * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix"); + * helper.validate(); + * ... // construct connector with discovered formats + * }</pre> + * + * <p>Note: This utility checks for left-over options in the final step. + */ + public static TableFactoryHelper createTableFactoryHelper( + DynamicTableFactory factory, + DynamicTableFactory.Context context) { + return new TableFactoryHelper(factory, context); + } + + /** + * Discovers a factory using the given factory base class and identifier. + * + * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)} + * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}, + * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)} + * are not applicable. + */ + @SuppressWarnings("unchecked") + public static <T extends Factory> T discoverFactory( + ClassLoader classLoader, + Class<T> factoryClass, + String factoryIdentifier) { + final List<Factory> factories = discoverFactories(classLoader); + + final List<Factory> foundFactories = factories.stream() + .filter(f -> factoryClass.isAssignableFrom(f.getClass())) + .collect(Collectors.toList()); + + if (foundFactories.isEmpty()) { + throw new ValidationException( + String.format( + "Could not find any factories that implement '%s' in the classpath.", + factoryClass.getName())); + } + + final List<Factory> matchingFactories = foundFactories.stream() + .filter(f -> f.factoryIdentifier().equals(factoryIdentifier)) + .collect(Collectors.toList()); + + if (matchingFactories.isEmpty()) { + throw new ValidationException( + String.format( + "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" + + "Available factory identifiers are:\n\n" + + "%s", + factoryIdentifier, + factoryClass.getName(), + foundFactories.stream() + .map(Factory::factoryIdentifier) + .sorted() + .collect(Collectors.joining("\n")))); + } + if (matchingFactories.size() > 1) { + throw new ValidationException( + String.format( + "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" + + "Ambiguous factory classes are:\n\n" + + "%s", + factoryIdentifier, + factoryClass.getName(), + foundFactories.stream() + .map(f -> factories.getClass().getName()) + .sorted() + .collect(Collectors.joining("\n")))); + } + + return (T) matchingFactories.get(0); + } + + /** + * Validates the required and optional {@link ConfigOption}s of a factory. + * + * <p>Note: It does not check for left-over options. + */ + public static void validateFactoryOptions(Factory factory, ReadableConfig options) { + // currently Flink's options have no validation feature which is why we access them eagerly + // to provoke a parsing error + + final List<String> missingRequiredOptions = factory.requiredOptions().stream() + .filter(option -> readOption(options, option) == null) + .map(ConfigOption::key) + .sorted() + .collect(Collectors.toList()); + + if (!missingRequiredOptions.isEmpty()) { + throw new ValidationException( + String.format( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "%s", + String.join("\n", missingRequiredOptions))); + } + + factory.optionalOptions() + .forEach(option -> readOption(options, option)); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + private static <T extends DynamicTableFactory> T getDynamicTableFactory( + Class<T> factoryClass, + @Nullable Catalog catalog, + DefaultDynamicTableContext context) { + // catalog factory has highest precedence + if (catalog != null) { + final Factory factory = catalog.getFactory() + .filter(f -> factoryClass.isAssignableFrom(f.getClass())) + .orElse(null); + if (factory != null) { + return (T) factory; + } + } + + // fallback to factory discovery + final String connectorOption = context.getCatalogTable() + .getOptions() + .get(CONNECTOR.key()); + if (connectorOption == null) { + throw new ValidationException( + String.format( + "Table options do not contain an option key '%s' for discovering a connector.", + CONNECTOR.key())); + } + try { + return discoverFactory(context.getClassLoader(), factoryClass, connectorOption); + } catch (ValidationException e) { + throw new ValidationException( + String.format( + "Cannot discover a connector using option '%s'.", + stringifyOption(CONNECTOR.key(), connectorOption)), + e); + } + } + + private static List<Factory> discoverFactories(ClassLoader classLoader) { + try { + final List<Factory> result = new LinkedList<>(); + ServiceLoader + .load(Factory.class, classLoader) + .iterator() + .forEachRemaining(result::add); + return result; + } catch (ServiceConfigurationError e) { + LOG.error("Could not load service provider for factories.", e); + throw new TableException("Could not load service provider for factories.", e); + } + } + + private static String stringifyOption(String key, String value) { + return String.format( + "'%s'='%s'", + EncodingUtils.escapeSingleQuotes(key), + EncodingUtils.escapeSingleQuotes(value)); + } + + private static Configuration asConfiguration(Map<String, String> options) { + final Configuration configuration = new Configuration(); + options.forEach(configuration::setString); + return configuration; + } + + private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) { + try { + return options.get(option); + } catch (Throwable t) { + throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t); + } + } + + // -------------------------------------------------------------------------------------------- + // Helper classes + // -------------------------------------------------------------------------------------------- + + /** + * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}. + * + * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context) + */ + public static class TableFactoryHelper { + + private final DynamicTableFactory tableFactory; + + private final DynamicTableFactory.Context context; + + private final Configuration allOptions; + + private final Set<String> consumedOptionKeys; + + private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) { + this.tableFactory = tableFactory; + this.context = context; + this.allOptions = asConfiguration(context.getCatalogTable().getOptions()); + this.consumedOptionKeys = new HashSet<>(); + this.consumedOptionKeys.add(PROPERTY_VERSION.key()); + this.consumedOptionKeys.add(CONNECTOR.key()); + this.consumedOptionKeys.addAll( + tableFactory.requiredOptions().stream() + .map(ConfigOption::key) + .collect(Collectors.toSet())); + this.consumedOptionKeys.addAll( + tableFactory.optionalOptions().stream() + .map(ConfigOption::key) + .collect(Collectors.toSet())); + } + + /** + * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier. + * + * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory. + */ + public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat( + Class<F> formatFactoryClass, + ConfigOption<String> formatOption, + String formatPrefix) { + return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix) + .orElseThrow(() -> + new ValidationException( + String.format("Could not find required scan format '%s'.", formatOption.key()))); + } + + /** + * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory + * identifier. + * + * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory. + */ + public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat( + Class<F> formatFactoryClass, + ConfigOption<String> formatOption, + String formatPrefix) { + return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix) + .map(formatFactory -> { + try { + return formatFactory.createScanFormat(context, projectOptions(formatPrefix)); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Error creating scan format '%s' in option space '%s'.", + formatFactory.factoryIdentifier(), + formatPrefix), + t); + } + }); + } + + /** + * Discovers a {@link SinkFormat} of the given type using the given option as factory identifier. + * + * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory. + */ + public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat( + Class<F> formatFactoryClass, + ConfigOption<String> formatOption, + String formatPrefix) { + return discoverOptionalSinkFormat(formatFactoryClass, formatOption, formatPrefix) + .orElseThrow(() -> + new ValidationException( + String.format("Could not find required sink format '%s'.", formatOption.key()))); + } + + /** + * Discovers a {@link SinkFormat} of the given type using the given option (if present) as factory + * identifier. + * + * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory. + */ + public <I, F extends SinkFormatFactory<I>> Optional<SinkFormat<I>> discoverOptionalSinkFormat( + Class<F> formatFactoryClass, + ConfigOption<String> formatOption, + String formatPrefix) { + return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix) + .map(formatFactory -> { + try { + return formatFactory.createSinkFormat(context, projectOptions(formatPrefix)); + } catch (Throwable t) { + throw new ValidationException( + String.format( + "Error creating sink format '%s' in option space '%s'.", + formatFactory.factoryIdentifier(), + formatPrefix), + t); + } + }); + } + + /** + * Validates the options of the {@link DynamicTableFactory}. It checks for unconsumed option + * keys. + */ + public void validate() { + validateFactoryOptions(tableFactory, allOptions); + final Set<String> remainingOptionKeys = new HashSet<>(allOptions.keySet()); + remainingOptionKeys.removeAll(consumedOptionKeys); + if (remainingOptionKeys.size() > 0) { + throw new ValidationException( + String.format( + "Unsupported options found for connector '%s'.\n\n" + + "Unsupported options:\n\n" + + "%s\n\n" + + "Supported options:\n\n" + + "%s", + tableFactory.factoryIdentifier(), + remainingOptionKeys.stream() + .sorted() + .collect(Collectors.joining("\n")), + consumedOptionKeys.stream() + .sorted() + .collect(Collectors.joining("\n")))); + } + } + + /** + * Returns all options of the table. + */ + public ReadableConfig getOptions() { + return allOptions; + } + + // ---------------------------------------------------------------------------------------- + + private <F extends Factory> Optional<F> discoverOptionalFormatFactory( + Class<F> formatFactoryClass, + ConfigOption<String> formatOption, + String formatPrefix) { + final String identifier = allOptions.get(formatOption); + if (identifier == null) { + return Optional.empty(); + } + final F factory = discoverFactory( + context.getClassLoader(), + formatFactoryClass, + identifier); + // log all used options of other factories + consumedOptionKeys.addAll( + factory.requiredOptions().stream() + .map(ConfigOption::key) + .map(k -> formatPrefix + k) + .collect(Collectors.toSet())); + consumedOptionKeys.addAll( + factory.optionalOptions().stream() + .map(ConfigOption::key) + .map(k -> formatPrefix + k) + .collect(Collectors.toSet())); + return Optional.of(factory); + } + + private ReadableConfig projectOptions(String formatPrefix) { + return new DelegatingConfiguration( + allOptions, + formatPrefix); + } + } + + private static class DefaultDynamicTableContext implements DynamicTableFactory.Context { + + private final ObjectIdentifier objectIdentifier; + private final CatalogTable catalogTable; + private final ReadableConfig configuration; + private final ClassLoader classLoader; + + DefaultDynamicTableContext( + ObjectIdentifier objectIdentifier, + CatalogTable catalogTable, + ReadableConfig configuration, + ClassLoader classLoader) { + this.objectIdentifier = objectIdentifier; + this.catalogTable = catalogTable; + this.configuration = configuration; + this.classLoader = classLoader; + } + + @Override + public ObjectIdentifier getObjectIdentifier() { + return objectIdentifier; + } + + @Override + public CatalogTable getCatalogTable() { + return catalogTable; + } + + @Override + public ReadableConfig getConfiguration() { + return configuration; + } + + @Override + public ClassLoader getClassLoader() { + return classLoader; + } + } + + // -------------------------------------------------------------------------------------------- + + private FactoryUtil() { + // no instantiation + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java new file mode 100644 index 0000000..184c432 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.format.ScanFormat; +import org.apache.flink.table.connector.source.ScanTableSource; + +/** + * Base interface for configuring a {@link ScanFormat} for a {@link ScanTableSource}. + * + * <p>Depending on the kind of external system, a connector might support different encodings for + * reading and writing rows. This interface helps in making such formats pluggable. + * + * <p>The created {@link Format} instance is an intermediate representation that can be used to construct + * runtime implementation in a later step. + * + * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context) + * + * @param <I> runtime interface needed by the table source + */ +@PublicEvolving +public interface ScanFormatFactory<I> extends Factory { + + /** + * Creates a format from the given context and format options. + * + * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors} + * to {@code format.ignore-errors}). + */ + ScanFormat<I> createScanFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions); +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java new file mode 100644 index 0000000..9b669df --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.data.RowData; + +/** + * Factory for creating a {@link SinkFormat} for {@link SerializationSchema}. + * + * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context) + */ +@PublicEvolving +public interface SerializationFormatFactory extends SinkFormatFactory<SerializationSchema<RowData>> { + // interface is used for discovery but is already fully specified by the generics +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java new file mode 100644 index 0000000..4212ea6 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.format.Format; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.source.ScanTableSource; + +/** + * Base interface for configuring a {@link SinkFormat} for a {@link ScanTableSource}. + * + * <p>Depending on the kind of external system, a connector might support different encodings for + * reading and writing rows. This interface helps in making such formats pluggable. + * + * <p>The created {@link Format} instance is an intermediate representation that can be used to construct + * runtime implementation in a later step. + * + * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context) + * + * @param <I> runtime interface needed by the table sink + */ +@PublicEvolving +public interface SinkFormatFactory<I> extends Factory { + + /** + * Creates a format from the given context and format options. + * + * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors} + * to {@code format.ignore-errors}). + */ + SinkFormat<I> createSinkFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions); +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java new file mode 100644 index 0000000..15f6412 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock; +import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock; +import org.apache.flink.table.factories.TestFormatFactory.ScanFormatMock; +import org.apache.flink.table.factories.TestFormatFactory.SinkFormatMock; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; + +import static org.apache.flink.util.CoreMatchers.containsCause; +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link FactoryUtil}. + */ +public class FactoryUtilTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testMissingConnector() { + expectError("Table options do not contain an option key 'connector' for discovering a connector."); + testError(options -> options.remove("connector")); + } + + @Test + public void testInvalidConnector() { + expectError( + "Could not find any factory for identifier 'FAIL' that implements '" + + DynamicTableSourceFactory.class.getName() + "' in the classpath.\n\n" + + "Available factory identifiers are:\n\n" + + "test-connector"); + testError(options -> options.put("connector", "FAIL")); + } + + @Test + public void testMissingConnectorOption() { + expectError( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "target"); + testError(options -> options.remove("target")); + } + + @Test + public void testInvalidConnectorOption() { + expectError("Invalid value for option 'buffer-size'."); + testError(options -> options.put("buffer-size", "FAIL")); + } + + @Test + public void testMissingFormat() { + expectError("Could not find required scan format 'value.format.kind'."); + testError(options -> options.remove("value.format.kind")); + } + + @Test + public void testInvalidFormat() { + expectError( + "Could not find any factory for identifier 'FAIL' that implements '" + + DeserializationFormatFactory.class.getName() + "' in the classpath.\n\n" + + "Available factory identifiers are:\n\n" + + "test-format"); + testError(options -> options.put("value.format.kind", "FAIL")); + } + + @Test + public void testMissingFormatOption() { + expectError( + "Error creating scan format 'test-format' in option space 'key.format.'."); + expectError( + "One or more required options are missing.\n\n" + + "Missing required options are:\n\n" + + "delimiter"); + testError(options -> options.remove("key.format.delimiter")); + } + + @Test + public void testInvalidFormatOption() { + expectError("Invalid value for option 'fail-on-missing'."); + testError(options -> options.put("key.format.fail-on-missing", "FAIL")); + } + + @Test + public void testUnconsumedOption() { + expectError( + "Unsupported options found for connector 'test-connector'.\n\n" + + "Unsupported options:\n\n" + + "this-is-also-not-consumed\n" + + "this-is-not-consumed\n\n" + + "Supported options:\n\n" + + "buffer-size\n" + + "connector\n" + + "key.format.delimiter\n" + + "key.format.fail-on-missing\n" + + "key.format.kind\n" + + "property-version\n" + + "target\n" + + "value.format.delimiter\n" + + "value.format.fail-on-missing\n" + + "value.format.kind"); + testError(options -> { + options.put("this-is-not-consumed", "42"); + options.put("this-is-also-not-consumed", "true"); + }); + } + + @Test + public void testAllOptions() { + final Map<String, String> options = createAllOptions(); + final DynamicTableSource actualSource = createTableSource(options); + final DynamicTableSource expectedSource = new DynamicTableSourceMock( + "MyTarget", + new ScanFormatMock(",", false), + new ScanFormatMock("|", true)); + assertEquals(expectedSource, actualSource); + final DynamicTableSink actualSink = createTableSink(options); + final DynamicTableSink expectedSink = new DynamicTableSinkMock( + "MyTarget", + 1000L, + new SinkFormatMock(","), + new SinkFormatMock("|")); + assertEquals(expectedSink, actualSink); + } + + @Test + public void testOptionalFormat() { + final Map<String, String> options = createAllOptions(); + options.remove("key.format.kind"); + options.remove("key.format.delimiter"); + final DynamicTableSource actualSource = createTableSource(options); + final DynamicTableSource expectedSource = new DynamicTableSourceMock( + "MyTarget", + null, + new ScanFormatMock("|", true)); + assertEquals(expectedSource, actualSource); + final DynamicTableSink actualSink = createTableSink(options); + final DynamicTableSink expectedSink = new DynamicTableSinkMock( + "MyTarget", + 1000L, + null, + new SinkFormatMock("|")); + assertEquals(expectedSink, actualSink); + } + + // -------------------------------------------------------------------------------------------- + + private void expectError(String message) { + thrown.expect(ValidationException.class); + thrown.expect(containsCause(new ValidationException(message))); + } + + private static void testError(Consumer<Map<String, String>> optionModifier) { + final Map<String, String> options = createAllOptions(); + optionModifier.accept(options); + createTableSource(options); + } + + private static Map<String, String> createAllOptions() { + final Map<String, String> options = new HashMap<>(); + // we use strings here to test realistic parsing + options.put("property-version", "1"); + options.put("connector", TestDynamicTableFactory.IDENTIFIER); + options.put("target", "MyTarget"); + options.put("buffer-size", "1000"); + options.put("key.format.kind", TestFormatFactory.IDENTIFIER); + options.put("key.format.delimiter", ","); + options.put("value.format.kind", TestFormatFactory.IDENTIFIER); + options.put("value.format.delimiter", "|"); + options.put("value.format.fail-on-missing", "true"); + return options; + } + + private static DynamicTableSource createTableSource(Map<String, String> options) { + return FactoryUtil.createTableSource( + null, + ObjectIdentifier.of("cat", "db", "table"), + new CatalogTableMock(options), + new Configuration(), + FactoryUtilTest.class.getClassLoader()); + } + + private static DynamicTableSink createTableSink(Map<String, String> options) { + return FactoryUtil.createTableSink( + null, + ObjectIdentifier.of("cat", "db", "table"), + new CatalogTableMock(options), + new Configuration(), + FactoryUtilTest.class.getClassLoader()); + } + + private static class CatalogTableMock implements CatalogTable { + + final Map<String, String> options; + + CatalogTableMock(Map<String, String> options) { + this.options = options; + } + + @Override + public boolean isPartitioned() { + return false; + } + + @Override + public List<String> getPartitionKeys() { + return null; + } + + @Override + public CatalogTable copy(Map<String, String> options) { + return null; + } + + @Override + public Map<String, String> toProperties() { + return null; + } + + @Override + public Map<String, String> getProperties() { + return options; + } + + @Override + public TableSchema getSchema() { + return null; + } + + @Override + public String getComment() { + return null; + } + + @Override + public CatalogBaseTable copy() { + return null; + } + + @Override + public Optional<String> getDescription() { + return Optional.empty(); + } + + @Override + public Optional<String> getDetailedDescription() { + return Optional.empty(); + } + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java new file mode 100644 index 0000000..ff5a21f --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.ScanFormat; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; + +import javax.annotation.Nullable; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Test implementations for {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory}. + */ +public final class TestDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + + public static final String IDENTIFIER = "test-connector"; + + public static final ConfigOption<String> TARGET = ConfigOptions + .key("target") + .stringType() + .noDefaultValue(); + + public static final ConfigOption<Long> BUFFER_SIZE = ConfigOptions + .key("buffer-size") + .longType() + .defaultValue(100L); + + public static final ConfigOption<String> KEY_FORMAT = ConfigOptions + .key("key.format.kind") + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions + .key("value.format.kind") + .stringType() + .noDefaultValue(); + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + final Optional<ScanFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalScanFormat( + DeserializationFormatFactory.class, + KEY_FORMAT, + FactoryUtil.KEY_FORMAT_PREFIX); + final ScanFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverScanFormat( + DeserializationFormatFactory.class, + VALUE_FORMAT, + FactoryUtil.VALUE_FORMAT_PREFIX); + helper.validate(); + + return new DynamicTableSourceMock( + helper.getOptions().get(TARGET), + keyFormat.orElse(null), + valueFormat); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + final Optional<SinkFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalSinkFormat( + SerializationFormatFactory.class, + KEY_FORMAT, + FactoryUtil.KEY_FORMAT_PREFIX); + final SinkFormat<SerializationSchema<RowData>> valueFormat = helper.discoverSinkFormat( + SerializationFormatFactory.class, + VALUE_FORMAT, + FactoryUtil.VALUE_FORMAT_PREFIX); + helper.validate(); + + return new DynamicTableSinkMock( + helper.getOptions().get(TARGET), + helper.getOptions().get(BUFFER_SIZE), + keyFormat.orElse(null), + valueFormat); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(TARGET); + options.add(VALUE_FORMAT); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(BUFFER_SIZE); + options.add(KEY_FORMAT); + return options; + } + + // -------------------------------------------------------------------------------------------- + // Table source + // -------------------------------------------------------------------------------------------- + + /** + * {@link DynamicTableSource} for testing. + */ + public static class DynamicTableSourceMock implements ScanTableSource { + + public final String target; + public final @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat; + public final ScanFormat<DeserializationSchema<RowData>> sourceValueFormat; + + DynamicTableSourceMock( + String target, + @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat, + ScanFormat<DeserializationSchema<RowData>> sourceValueFormat) { + this.target = target; + this.sourceKeyFormat = sourceKeyFormat; + this.sourceValueFormat = sourceValueFormat; + } + + @Override + public ChangelogMode getChangelogMode() { + return null; + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) { + return null; + } + + @Override + public DynamicTableSource copy() { + return null; + } + + @Override + public String asSummaryString() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DynamicTableSourceMock that = (DynamicTableSourceMock) o; + return target.equals(that.target) && + Objects.equals(sourceKeyFormat, that.sourceKeyFormat) && + sourceValueFormat.equals(that.sourceValueFormat); + } + + @Override + public int hashCode() { + return Objects.hash(target, sourceKeyFormat, sourceValueFormat); + } + } + + // -------------------------------------------------------------------------------------------- + // Table sink + // -------------------------------------------------------------------------------------------- + + /** + * {@link DynamicTableSink} for testing. + */ + public static class DynamicTableSinkMock implements DynamicTableSink { + + public final String target; + public final Long bufferSize; + public final @Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat; + public final SinkFormat<SerializationSchema<RowData>> sinkValueFormat; + + DynamicTableSinkMock( + String target, + Long bufferSize, + @Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat, + SinkFormat<SerializationSchema<RowData>> sinkValueFormat) { + this.target = target; + this.bufferSize = bufferSize; + this.sinkKeyFormat = sinkKeyFormat; + this.sinkValueFormat = sinkValueFormat; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return null; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return null; + } + + @Override + public DynamicTableSink copy() { + return null; + } + + @Override + public String asSummaryString() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DynamicTableSinkMock that = (DynamicTableSinkMock) o; + return target.equals(that.target) && + bufferSize.equals(that.bufferSize) && + Objects.equals(sinkKeyFormat, that.sinkKeyFormat) && + sinkValueFormat.equals(that.sinkValueFormat); + } + + @Override + public int hashCode() { + return Objects.hash(target, bufferSize, sinkKeyFormat, sinkValueFormat); + } + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java new file mode 100644 index 0000000..c04bdb6 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.ScanFormat; +import org.apache.flink.table.connector.format.SinkFormat; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +/** + * Tests implementations for {@link DeserializationFormatFactory} and {@link SerializationFormatFactory}. + */ +public class TestFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { + + public static final String IDENTIFIER = "test-format"; + + public static final ConfigOption<String> DELIMITER = ConfigOptions + .key("delimiter") + .stringType() + .noDefaultValue(); + + public static final ConfigOption<Boolean> FAIL_ON_MISSING = ConfigOptions + .key("fail-on-missing") + .booleanType() + .defaultValue(false); + + @Override + public ScanFormat<DeserializationSchema<RowData>> createScanFormat( + DynamicTableFactory.Context context, + ReadableConfig formatConfig) { + FactoryUtil.validateFactoryOptions(this, formatConfig); + return new ScanFormatMock(formatConfig.get(DELIMITER), formatConfig.get(FAIL_ON_MISSING)); + } + + @Override + public SinkFormat<SerializationSchema<RowData>> createSinkFormat( + DynamicTableFactory.Context context, + ReadableConfig formatConfig) { + FactoryUtil.validateFactoryOptions(this, formatConfig); + return new SinkFormatMock(formatConfig.get(DELIMITER)); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(DELIMITER); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(FAIL_ON_MISSING); + return options; + } + + // -------------------------------------------------------------------------------------------- + // Table source format + // -------------------------------------------------------------------------------------------- + + /** + * {@link ScanFormat} for testing. + */ + public static class ScanFormatMock implements ScanFormat<DeserializationSchema<RowData>> { + + public final String delimiter; + public final Boolean failOnMissing; + + ScanFormatMock(String delimiter, Boolean failOnMissing) { + this.delimiter = delimiter; + this.failOnMissing = failOnMissing; + } + + @Override + public DeserializationSchema<RowData> createScanFormat( + ScanTableSource.Context context, + DataType producedDataType) { + return null; + } + + @Override + public ChangelogMode getChangelogMode() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ScanFormatMock that = (ScanFormatMock) o; + return delimiter.equals(that.delimiter) && failOnMissing.equals(that.failOnMissing); + } + + @Override + public int hashCode() { + return Objects.hash(delimiter, failOnMissing); + } + } + + // -------------------------------------------------------------------------------------------- + // Table sink format + // -------------------------------------------------------------------------------------------- + + /** + * {@link SinkFormat} for testing. + */ + public static class SinkFormatMock implements SinkFormat<SerializationSchema<RowData>> { + + public final String delimiter; + + SinkFormatMock(String delimiter) { + this.delimiter = delimiter; + } + + @Override + public SerializationSchema<RowData> createSinkFormat( + ScanTableSource.Context context, + DataType consumeDataType) { + return null; + } + + @Override + public ChangelogMode getChangelogMode() { + return null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SinkFormatMock that = (SinkFormatMock) o; + return delimiter.equals(that.delimiter); + } + + @Override + public int hashCode() { + return Objects.hash(delimiter); + } + } +} diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000..d31007a --- /dev/null +++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.factories.TestDynamicTableFactory +org.apache.flink.table.factories.TestFormatFactory
