This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fef974d7df1a9dabd62f3288c086185d69298bc1
Author: Timo Walther <[email protected]>
AuthorDate: Thu Apr 30 11:03:47 2020 +0200

    [FLINK-16997][table-common] Add new factory interfaces and discovery 
utilities
    
    Implements the new factory interfaces mentioned in FLIP-95. Adds new 
factory utilities
    that can be used for FLIP-122 and future factories.
    
    It adds TestDynamicTableFactory and TestFormatFactory for reference 
implementations
    of new factories.
    
    This closes #11959.
---
 .../table/tests/test_catalog_completeness.py       |   1 +
 .../org/apache/flink/table/catalog/Catalog.java    |  20 +
 .../flink/table/connector/format/Format.java       |  55 ++
 .../flink/table/connector/format/ScanFormat.java   |  37 ++
 .../flink/table/connector/format/SinkFormat.java   |  38 ++
 .../factories/DeserializationFormatFactory.java    |  34 ++
 .../flink/table/factories/DynamicTableFactory.java |  73 +++
 .../table/factories/DynamicTableSinkFactory.java   |  42 ++
 .../table/factories/DynamicTableSourceFactory.java |  42 ++
 .../org/apache/flink/table/factories/Factory.java  |  79 +++
 .../apache/flink/table/factories/FactoryUtil.java  | 570 +++++++++++++++++++++
 .../flink/table/factories/ScanFormatFactory.java   |  50 ++
 .../factories/SerializationFormatFactory.java      |  34 ++
 .../flink/table/factories/SinkFormatFactory.java   |  50 ++
 .../flink/table/factories/FactoryUtilTest.java     | 285 +++++++++++
 .../table/factories/TestDynamicTableFactory.java   | 257 ++++++++++
 .../flink/table/factories/TestFormatFactory.java   | 180 +++++++
 .../org.apache.flink.table.factories.Factory       |  17 +
 18 files changed, 1864 insertions(+)

diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py 
b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index 3e83357..245a2be 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -43,6 +43,7 @@ class 
CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa
         return {
             'open',
             'close',
+            'getFactory',
             'getTableFactory',
             'getFunctionDefinitionFactory',
             'listPartitionsByFilter'}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index b4ff5f3..1e4c482 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -35,6 +35,8 @@ import 
org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 
@@ -49,11 +51,29 @@ import java.util.Optional;
 public interface Catalog {
 
        /**
+        * Returns a factory for creating instances from catalog objects.
+        *
+        * <p>This method enables bypassing the discovery process. Implementers 
can directly pass internal
+        * catalog-specific objects to their own factory. For example, a custom 
{@link CatalogTable} can
+        * be processed by a custom {@link DynamicTableFactory}.
+        *
+        * <p>Because all factories are interfaces, the returned {@link 
Factory} instance can implement multiple
+        * supported extension points. An {@code instanceof} check is performed 
by the caller that checks
+        * whether a required factory is implemented; otherwise the discovery 
process is used.
+        */
+       default Optional<Factory> getFactory() {
+               return Optional.empty();
+       }
+
+       /**
         * Get an optional {@link TableFactory} instance that's responsible for 
generating table-related
         * instances stored in this catalog, instances such as source/sink.
         *
         * @return an optional TableFactory instance
+        * @deprecated Use {@link #getFactory()} for the new factory stack. The 
new factory stack uses the
+        *             new table sources and sinks defined in FLIP-95 and a 
slightly different discovery mechanism.
         */
+       @Deprecated
        default Optional<TableFactory> getTableFactory() {
                return Optional.empty();
        }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
new file mode 100644
index 0000000..40c0115
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Base interface for connector formats.
+ *
+ * <p>Depending on the kind of external system, a connector might support 
different encodings for
+ * reading and writing rows. This interface is an intermediate representation 
before constructing actual
+ * runtime implementation.
+ *
+ * <p>Formats can be distinguished along two dimensions:
+ * <ul>
+ *     <li>Context in which the format is applied (e.g. {@link 
ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Runtime implementation interface that is required (e.g. {@link 
DeserializationSchema} or
+ *     some bulk interface).</li>
+ * </ul>
+ *
+ * <p>A {@link DynamicTableFactory} can search for a format that it is 
accepted by the connector.
+ *
+ * @see ScanFormat
+ * @see SinkFormat
+ */
+@PublicEvolving
+public interface Format {
+
+       /**
+        * Returns the set of changes that a connector (and transitively the 
planner) can expect during
+        * runtime.
+        */
+       ChangelogMode getChangelogMode();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
new file mode 100644
index 0000000..529ea37
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link ScanTableSource}.
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormat<I> extends Format {
+
+       /**
+        * Creates runtime implementation that is configured to produce data of 
the given data type.
+        */
+       I createScanFormat(ScanTableSource.Context context, DataType 
producedDataType);
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
new file mode 100644
index 0000000..0b67336
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link DynamicTableSink}.
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormat<I> extends Format {
+
+       /**
+        * Creates runtime implementation that is configured to consume data of 
the given data type.
+        */
+       I createSinkFormat(ScanTableSource.Context context, DataType 
consumedDataType);
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
new file mode 100644
index 0000000..1901fc5
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link ScanFormat} for {@link DeserializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, 
DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface DeserializationFormatFactory extends 
ScanFormatFactory<DeserializationSchema<RowData>> {
+  // interface is used for discovery but is already fully specified by the 
generics
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
new file mode 100644
index 0000000..0d4f8ba
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Base interface for configuring a dynamic table connector for an external 
storage system from catalog
+ * and session information.
+ *
+ * <p>Dynamic tables are the core concept of Flink's Table & SQL API for 
processing both bounded and
+ * unbounded data in a unified fashion.
+ *
+ * <p>Implement {@link DynamicTableSourceFactory} for constructing a {@link 
DynamicTableSource}.
+ *
+ * <p>Implement {@link DynamicTableSinkFactory} for constructing a {@link 
DynamicTableSink}.
+ *
+ * <p>The options {@link FactoryUtil#PROPERTY_VERSION} and {@link 
FactoryUtil#CONNECTOR} are implicitly
+ * added and must not be declared.
+ */
+@PublicEvolving
+public interface DynamicTableFactory extends Factory {
+
+       /**
+        * Provides catalog and session information describing the dynamic 
table to be accessed.
+        */
+       interface Context {
+
+               /**
+                * Returns the identifier of the table in the {@link Catalog}.
+                */
+               ObjectIdentifier getObjectIdentifier();
+
+               /**
+                * Returns table information received from the {@link Catalog}.
+                */
+               CatalogTable getCatalogTable();
+
+               /**
+                * Gives read-only access to the configuration of the current 
session.
+                */
+               ReadableConfig getConfiguration();
+
+               /**
+                * Returns the class loader of the current session.
+                *
+                * <p>The class loader is in particular useful for discovering 
further (nested) factories.
+                */
+               ClassLoader getClassLoader();
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
new file mode 100644
index 0000000..7f428fe
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and 
additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a 
factory.
+ */
+@PublicEvolving
+public interface DynamicTableSinkFactory extends DynamicTableFactory {
+
+       /**
+        * Creates a {@link DynamicTableSink} instance from a {@link 
CatalogTable} and additional context
+        * information.
+        *
+        * <p>An implementation should perform validation and the discovery of 
further (nested) factories
+        * in this method.
+        */
+       DynamicTableSink createDynamicTableSink(Context context);
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
new file mode 100644
index 0000000..3c0ecd1
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} 
and additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a 
factory.
+ */
+@PublicEvolving
+public interface DynamicTableSourceFactory extends DynamicTableFactory {
+
+       /**
+        * Creates a {@link DynamicTableSource} instance from a {@link 
CatalogTable} and additional context
+        * information.
+        *
+        * <p>An implementation should perform validation and the discovery of 
further (nested) factories
+        * in this method.
+        */
+       DynamicTableSource createDynamicTableSource(Context context);
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
new file mode 100644
index 0000000..b669e76
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Set;
+
+/**
+ * Base interface for all kind of factories that create object instances from 
a list of key-value pairs
+ * in Flink's Table & SQL API.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link 
#factoryIdentifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service 
Provider Interfaces (SPI). Classes
+ * that implement this interface can be added to {@code 
META_INF/services/org.apache.flink.table.factories.Factory}
+ * in JAR files.
+ *
+ * <p>Every factory declares a set of required and optional options. This 
information will not be used
+ * during discovery but is helpful when generating documentation and 
performing validation. A factory may
+ * discover further (nested) factories, the options of the nested factories 
must not be declared in the
+ * sets of this factory.
+ *
+ * <p>It is the responsibility of each factory to perform validation before 
returning an instance.
+ *
+ * <p>For consistency, the following style for key names of {@link 
ConfigOption} is recommended:
+ * <ul>
+ *     <li>Try to <b>reuse</b> key names as much as possible. Use other 
factory implementations as an example.
+ *     <li>Key names should be declared in <b>lower case</b>. Use "-" instead 
of dots or camel case to split words.
+ *     <li>Key names should be <b>hierarchical</b> where appropriate. Think 
about how one would define such
+ *     a hierarchy in JSON or YAML file (e.g. {@code 
sink.bulk-flush.max-actions}).
+ *     <li>In case of a hierarchy, try not to use the higher level again in 
the key name (e.g. do {@code sink.partitioner}
+ *     instead of {@code sink.sink-partitioner}) to <b>keep the keys short</b>.
+ * </ul>
+ */
+@PublicEvolving
+public interface Factory {
+
+       /**
+        * Returns a unique identifier among same factory interfaces.
+        *
+        * <p>For consistency, an identifier should be declared as one lower 
case word (e.g. {@code kafka}). If
+        * multiple factories exist for different versions, a version should be 
appended using "-" (e.g. {@code kafka-0.10}).
+        */
+       String factoryIdentifier();
+
+       /**
+        * Returns a set of {@link ConfigOption} that an implementation of this 
factory requires in addition to
+        * {@link #optionalOptions()}.
+        *
+        * <p>See the documentation of {@link Factory} for more information.
+        */
+       Set<ConfigOption<?>> requiredOptions();
+
+       /**
+        * Returns a set of {@link ConfigOption} that an implementation of this 
factory consumes in addition to
+        * {@link #requiredOptions()}.
+        *
+        * <p>See the documentation of {@link Factory} for more information.
+        */
+       Set<ConfigOption<?>> optionalOptions();
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
new file mode 100644
index 0000000..fa42b8f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@PublicEvolving
+public final class FactoryUtil {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
+
+       public static final ConfigOption<Integer> PROPERTY_VERSION = 
ConfigOptions.key("property-version")
+               .intType()
+               .defaultValue(1)
+               .withDescription(
+                       "Version of the overall property design. This option is 
meant for future backwards compatibility.");
+
+       public static final ConfigOption<String> CONNECTOR = 
ConfigOptions.key("connector")
+               .stringType()
+               .noDefaultValue()
+               .withDescription(
+                       "Uniquely identifies the connector of a dynamic table 
that is used for accessing data in " +
+                       "an external system. Its value is used during table 
source and table sink discovery.");
+
+       public static final String FORMAT_PREFIX = "format.";
+
+       public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+       public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+       /**
+        * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+        *
+        * <p>It considers {@link Catalog#getFactory()} if provided.
+        */
+       public static DynamicTableSource createTableSource(
+                       @Nullable Catalog catalog,
+                       ObjectIdentifier objectIdentifier,
+                       CatalogTable catalogTable,
+                       ReadableConfig configuration,
+                       ClassLoader classLoader) {
+               final DefaultDynamicTableContext context = new 
DefaultDynamicTableContext(
+                       objectIdentifier,
+                       catalogTable,
+                       configuration,
+                       classLoader);
+               try {
+                       final DynamicTableSourceFactory factory = 
getDynamicTableFactory(
+                               DynamicTableSourceFactory.class,
+                               catalog,
+                               context);
+                       return factory.createDynamicTableSource(context);
+               } catch (Throwable t) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Unable to create a source for reading 
table '%s'.\n\n" +
+                                       "Table options are:\n\n" +
+                                       "%s",
+                                       objectIdentifier.asSummaryString(),
+                                       catalogTable.getOptions()
+                                               .entrySet()
+                                               .stream()
+                                               .map(e -> 
stringifyOption(e.getKey(), e.getValue()))
+                                               .sorted()
+                                               
.collect(Collectors.joining("\n"))),
+                               t);
+               }
+       }
+
+       /**
+        * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+        *
+        * <p>It considers {@link Catalog#getFactory()} if provided.
+        */
+       public static DynamicTableSink createTableSink(
+                       @Nullable Catalog catalog,
+                       ObjectIdentifier objectIdentifier,
+                       CatalogTable catalogTable,
+                       ReadableConfig configuration,
+                       ClassLoader classLoader) {
+               final DefaultDynamicTableContext context = new 
DefaultDynamicTableContext(
+                       objectIdentifier,
+                       catalogTable,
+                       configuration,
+                       classLoader);
+               try {
+                       final DynamicTableSinkFactory factory = 
getDynamicTableFactory(
+                               DynamicTableSinkFactory.class,
+                               catalog,
+                               context);
+                       return factory.createDynamicTableSink(context);
+               } catch (Throwable t) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Unable to create a sink for writing 
table '%s'.\n\n" +
+                                       "Table options are:\n\n" +
+                                       "%s",
+                                       objectIdentifier.asSummaryString(),
+                                       catalogTable.getOptions()
+                                               .entrySet()
+                                               .stream()
+                                               .map(e -> 
stringifyOption(e.getKey(), e.getValue()))
+                                               .sorted()
+                                               
.collect(Collectors.joining("\n"))),
+                               t);
+               }
+       }
+
+       /**
+        * Creates a utility that helps in discovering formats and validating 
all options for a {@link DynamicTableFactory}.
+        *
+        * <p>The following example sketches the usage:
+        * <pre>{@code
+        * // in createDynamicTableSource()
+        * helper = FactoryUtil.createTableFactoryHelper(this, context);
+        * keyFormat = helper.discoverScanFormat(classloader, 
MyFormatFactory.class, KEY_OPTION, "prefix");
+        * valueFormat = helper.discoverScanFormat(classloader, 
MyFormatFactory.class, VALUE_OPTION, "prefix");
+        * helper.validate();
+        * ... // construct connector with discovered formats
+        * }</pre>
+        *
+        * <p>Note: This utility checks for left-over options in the final step.
+        */
+       public static TableFactoryHelper createTableFactoryHelper(
+                       DynamicTableFactory factory,
+                       DynamicTableFactory.Context context) {
+               return new TableFactoryHelper(factory, context);
+       }
+
+       /**
+        * Discovers a factory using the given factory base class and 
identifier.
+        *
+        * <p>This method is meant for cases where {@link 
#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+        * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, 
ReadableConfig, ClassLoader)},
+        * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, 
ReadableConfig, ClassLoader)}
+        * are not applicable.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T extends Factory> T discoverFactory(
+                       ClassLoader classLoader,
+                       Class<T> factoryClass,
+                       String factoryIdentifier) {
+               final List<Factory> factories = discoverFactories(classLoader);
+
+               final List<Factory> foundFactories = factories.stream()
+                       .filter(f -> 
factoryClass.isAssignableFrom(f.getClass()))
+                       .collect(Collectors.toList());
+
+               if (foundFactories.isEmpty()) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Could not find any factories that 
implement '%s' in the classpath.",
+                                       factoryClass.getName()));
+               }
+
+               final List<Factory> matchingFactories = foundFactories.stream()
+                       .filter(f -> 
f.factoryIdentifier().equals(factoryIdentifier))
+                       .collect(Collectors.toList());
+
+               if (matchingFactories.isEmpty()) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Could not find any factory for 
identifier '%s' that implements '%s' in the classpath.\n\n" +
+                                       "Available factory identifiers 
are:\n\n" +
+                                       "%s",
+                                       factoryIdentifier,
+                                       factoryClass.getName(),
+                                       foundFactories.stream()
+                                               .map(Factory::factoryIdentifier)
+                                               .sorted()
+                                               
.collect(Collectors.joining("\n"))));
+               }
+               if (matchingFactories.size() > 1) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Multiple factories for identifier '%s' 
that implement '%s' found in the classpath.\n\n" +
+                                       "Ambiguous factory classes are:\n\n" +
+                                       "%s",
+                                       factoryIdentifier,
+                                       factoryClass.getName(),
+                                       foundFactories.stream()
+                                               .map(f -> 
factories.getClass().getName())
+                                               .sorted()
+                                               
.collect(Collectors.joining("\n"))));
+               }
+
+               return (T) matchingFactories.get(0);
+       }
+
+       /**
+        * Validates the required and optional {@link ConfigOption}s of a 
factory.
+        *
+        * <p>Note: It does not check for left-over options.
+        */
+       public static void validateFactoryOptions(Factory factory, 
ReadableConfig options) {
+               // currently Flink's options have no validation feature which 
is why we access them eagerly
+               // to provoke a parsing error
+
+               final List<String> missingRequiredOptions = 
factory.requiredOptions().stream()
+                       .filter(option -> readOption(options, option) == null)
+                       .map(ConfigOption::key)
+                       .sorted()
+                       .collect(Collectors.toList());
+
+               if (!missingRequiredOptions.isEmpty()) {
+                       throw new ValidationException(
+                               String.format(
+                                       "One or more required options are 
missing.\n\n" +
+                                       "Missing required options are:\n\n" +
+                                       "%s",
+                                       String.join("\n", 
missingRequiredOptions)));
+               }
+
+               factory.optionalOptions()
+                       .forEach(option -> readOption(options, option));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Helper methods
+       // 
--------------------------------------------------------------------------------------------
+
+       @SuppressWarnings("unchecked")
+       private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+                       Class<T> factoryClass,
+                       @Nullable Catalog catalog,
+                       DefaultDynamicTableContext context) {
+               // catalog factory has highest precedence
+               if (catalog != null) {
+                       final Factory factory = catalog.getFactory()
+                               .filter(f -> 
factoryClass.isAssignableFrom(f.getClass()))
+                               .orElse(null);
+                       if (factory != null) {
+                               return (T) factory;
+                       }
+               }
+
+               // fallback to factory discovery
+               final String connectorOption = context.getCatalogTable()
+                       .getOptions()
+                       .get(CONNECTOR.key());
+               if (connectorOption == null) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Table options do not contain an option 
key '%s' for discovering a connector.",
+                                       CONNECTOR.key()));
+               }
+               try {
+                       return discoverFactory(context.getClassLoader(), 
factoryClass, connectorOption);
+               } catch (ValidationException e) {
+                       throw new ValidationException(
+                               String.format(
+                                       "Cannot discover a connector using 
option '%s'.",
+                                       stringifyOption(CONNECTOR.key(), 
connectorOption)),
+                               e);
+               }
+       }
+
+       private static List<Factory> discoverFactories(ClassLoader classLoader) 
{
+               try {
+                       final List<Factory> result = new LinkedList<>();
+                       ServiceLoader
+                               .load(Factory.class, classLoader)
+                               .iterator()
+                               .forEachRemaining(result::add);
+                       return result;
+               } catch (ServiceConfigurationError e) {
+                       LOG.error("Could not load service provider for 
factories.", e);
+                       throw new TableException("Could not load service 
provider for factories.", e);
+               }
+       }
+
+       private static String stringifyOption(String key, String value) {
+               return String.format(
+                       "'%s'='%s'",
+                       EncodingUtils.escapeSingleQuotes(key),
+                       EncodingUtils.escapeSingleQuotes(value));
+       }
+
+       private static Configuration asConfiguration(Map<String, String> 
options) {
+               final Configuration configuration = new Configuration();
+               options.forEach(configuration::setString);
+               return configuration;
+       }
+
+       private static <T> T readOption(ReadableConfig options, ConfigOption<T> 
option) {
+               try {
+                       return options.get(option);
+               } catch (Throwable t) {
+                       throw new ValidationException(String.format("Invalid 
value for option '%s'.", option.key()), t);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Helper classes
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Helper utility for discovering formats and validating all options 
for a {@link DynamicTableFactory}.
+        *
+        * @see #createTableFactoryHelper(DynamicTableFactory, 
DynamicTableFactory.Context)
+        */
+       public static class TableFactoryHelper {
+
+               private final DynamicTableFactory tableFactory;
+
+               private final DynamicTableFactory.Context context;
+
+               private final Configuration allOptions;
+
+               private final Set<String> consumedOptionKeys;
+
+               private TableFactoryHelper(DynamicTableFactory tableFactory, 
DynamicTableFactory.Context context) {
+                       this.tableFactory = tableFactory;
+                       this.context = context;
+                       this.allOptions = 
asConfiguration(context.getCatalogTable().getOptions());
+                       this.consumedOptionKeys = new HashSet<>();
+                       this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+                       this.consumedOptionKeys.add(CONNECTOR.key());
+                       this.consumedOptionKeys.addAll(
+                               tableFactory.requiredOptions().stream()
+                                       .map(ConfigOption::key)
+                                       .collect(Collectors.toSet()));
+                       this.consumedOptionKeys.addAll(
+                               tableFactory.optionalOptions().stream()
+                                       .map(ConfigOption::key)
+                                       .collect(Collectors.toSet()));
+               }
+
+               /**
+                * Discovers a {@link ScanFormat} of the given type using the 
given option as factory identifier.
+                *
+                * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the 
options for the format factory.
+                */
+               public <I, F extends ScanFormatFactory<I>> ScanFormat<I> 
discoverScanFormat(
+                               Class<F> formatFactoryClass,
+                               ConfigOption<String> formatOption,
+                               String formatPrefix) {
+                       return discoverOptionalScanFormat(formatFactoryClass, 
formatOption, formatPrefix)
+                               .orElseThrow(() ->
+                                       new ValidationException(
+                                               String.format("Could not find 
required scan format '%s'.", formatOption.key())));
+               }
+
+               /**
+                * Discovers a {@link ScanFormat} of the given type using the 
given option (if present) as factory
+                * identifier.
+                *
+                * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the 
options for the format factory.
+                */
+               public <I, F extends ScanFormatFactory<I>> 
Optional<ScanFormat<I>> discoverOptionalScanFormat(
+                               Class<F> formatFactoryClass,
+                               ConfigOption<String> formatOption,
+                               String formatPrefix) {
+                       return 
discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+                               .map(formatFactory -> {
+                                       try {
+                                               return 
formatFactory.createScanFormat(context, projectOptions(formatPrefix));
+                                       } catch (Throwable t) {
+                                               throw new ValidationException(
+                                                       String.format(
+                                                               "Error creating 
scan format '%s' in option space '%s'.",
+                                                               
formatFactory.factoryIdentifier(),
+                                                               formatPrefix),
+                                                       t);
+                                       }
+                               });
+               }
+
+               /**
+                * Discovers a {@link SinkFormat} of the given type using the 
given option as factory identifier.
+                *
+                * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the 
options for the format factory.
+                */
+               public <I, F extends SinkFormatFactory<I>> SinkFormat<I> 
discoverSinkFormat(
+                               Class<F> formatFactoryClass,
+                               ConfigOption<String> formatOption,
+                               String formatPrefix) {
+                       return discoverOptionalSinkFormat(formatFactoryClass, 
formatOption, formatPrefix)
+                               .orElseThrow(() ->
+                                       new ValidationException(
+                                               String.format("Could not find 
required sink format '%s'.", formatOption.key())));
+               }
+
+               /**
+                * Discovers a {@link SinkFormat} of the given type using the 
given option (if present) as factory
+                * identifier.
+                *
+                * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the 
options for the format factory.
+                */
+               public <I, F extends SinkFormatFactory<I>> 
Optional<SinkFormat<I>> discoverOptionalSinkFormat(
+                               Class<F> formatFactoryClass,
+                               ConfigOption<String> formatOption,
+                               String formatPrefix) {
+                       return 
discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+                               .map(formatFactory -> {
+                                       try {
+                                               return 
formatFactory.createSinkFormat(context, projectOptions(formatPrefix));
+                                       } catch (Throwable t) {
+                                               throw new ValidationException(
+                                                       String.format(
+                                                               "Error creating 
sink format '%s' in option space '%s'.",
+                                                               
formatFactory.factoryIdentifier(),
+                                                               formatPrefix),
+                                                       t);
+                                       }
+                               });
+               }
+
+               /**
+                * Validates the options of the {@link DynamicTableFactory}. It 
checks for unconsumed option
+                * keys.
+                */
+               public void validate() {
+                       validateFactoryOptions(tableFactory, allOptions);
+                       final Set<String> remainingOptionKeys = new 
HashSet<>(allOptions.keySet());
+                       remainingOptionKeys.removeAll(consumedOptionKeys);
+                       if (remainingOptionKeys.size() > 0) {
+                               throw new ValidationException(
+                                       String.format(
+                                               "Unsupported options found for 
connector '%s'.\n\n" +
+                                               "Unsupported options:\n\n" +
+                                               "%s\n\n" +
+                                               "Supported options:\n\n" +
+                                               "%s",
+                                               
tableFactory.factoryIdentifier(),
+                                               remainingOptionKeys.stream()
+                                                       .sorted()
+                                                       
.collect(Collectors.joining("\n")),
+                                               consumedOptionKeys.stream()
+                                                       .sorted()
+                                                       
.collect(Collectors.joining("\n"))));
+                       }
+               }
+
+               /**
+                * Returns all options of the table.
+                */
+               public ReadableConfig getOptions() {
+                       return allOptions;
+               }
+
+               // 
----------------------------------------------------------------------------------------
+
+               private <F extends Factory> Optional<F> 
discoverOptionalFormatFactory(
+                               Class<F> formatFactoryClass,
+                               ConfigOption<String> formatOption,
+                               String formatPrefix) {
+                       final String identifier = allOptions.get(formatOption);
+                       if (identifier == null) {
+                               return Optional.empty();
+                       }
+                       final F factory = discoverFactory(
+                               context.getClassLoader(),
+                               formatFactoryClass,
+                               identifier);
+                       // log all used options of other factories
+                       consumedOptionKeys.addAll(
+                               factory.requiredOptions().stream()
+                                       .map(ConfigOption::key)
+                                       .map(k -> formatPrefix + k)
+                                       .collect(Collectors.toSet()));
+                       consumedOptionKeys.addAll(
+                               factory.optionalOptions().stream()
+                                       .map(ConfigOption::key)
+                                       .map(k -> formatPrefix + k)
+                                       .collect(Collectors.toSet()));
+                       return Optional.of(factory);
+               }
+
+               private ReadableConfig projectOptions(String formatPrefix) {
+                       return new DelegatingConfiguration(
+                               allOptions,
+                               formatPrefix);
+               }
+       }
+
+       private static class DefaultDynamicTableContext implements 
DynamicTableFactory.Context {
+
+               private final ObjectIdentifier objectIdentifier;
+               private final CatalogTable catalogTable;
+               private final ReadableConfig configuration;
+               private final ClassLoader classLoader;
+
+               DefaultDynamicTableContext(
+                               ObjectIdentifier objectIdentifier,
+                               CatalogTable catalogTable,
+                               ReadableConfig configuration,
+                               ClassLoader classLoader) {
+                       this.objectIdentifier = objectIdentifier;
+                       this.catalogTable = catalogTable;
+                       this.configuration = configuration;
+                       this.classLoader = classLoader;
+               }
+
+               @Override
+               public ObjectIdentifier getObjectIdentifier() {
+                       return objectIdentifier;
+               }
+
+               @Override
+               public CatalogTable getCatalogTable() {
+                       return catalogTable;
+               }
+
+               @Override
+               public ReadableConfig getConfiguration() {
+                       return configuration;
+               }
+
+               @Override
+               public ClassLoader getClassLoader() {
+                       return classLoader;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private FactoryUtil() {
+               // no instantiation
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
new file mode 100644
index 0000000..184c432
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link ScanFormat} for a {@link 
ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support 
different encodings for
+ * reading and writing rows. This interface helps in making such formats 
pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation 
that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, 
DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormatFactory<I> extends Factory {
+
+       /**
+        * Creates a format from the given context and format options.
+        *
+        * <p>The format options have been projected to top-level options (e.g. 
from {@code key.format.ignore-errors}
+        * to {@code format.ignore-errors}).
+        */
+       ScanFormat<I> createScanFormat(DynamicTableFactory.Context context, 
ReadableConfig formatOptions);
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
new file mode 100644
index 0000000..9b669df
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link SinkFormat} for {@link SerializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, 
DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface SerializationFormatFactory extends 
SinkFormatFactory<SerializationSchema<RowData>> {
+  // interface is used for discovery but is already fully specified by the 
generics
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
new file mode 100644
index 0000000..4212ea6
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link SinkFormat} for a {@link 
ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support 
different encodings for
+ * reading and writing rows. This interface helps in making such formats 
pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation 
that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, 
DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormatFactory<I> extends Factory {
+
+       /**
+        * Creates a format from the given context and format options.
+        *
+        * <p>The format options have been projected to top-level options (e.g. 
from {@code key.format.ignore-errors}
+        * to {@code format.ignore-errors}).
+        */
+       SinkFormat<I> createSinkFormat(DynamicTableFactory.Context context, 
ReadableConfig formatOptions);
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
new file mode 100644
index 0000000..15f6412
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock;
+import 
org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock;
+import org.apache.flink.table.factories.TestFormatFactory.ScanFormatMock;
+import org.apache.flink.table.factories.TestFormatFactory.SinkFormatMock;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FactoryUtil}.
+ */
+public class FactoryUtilTest {
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void testMissingConnector() {
+               expectError("Table options do not contain an option key 
'connector' for discovering a connector.");
+               testError(options -> options.remove("connector"));
+       }
+
+       @Test
+       public void testInvalidConnector() {
+               expectError(
+                       "Could not find any factory for identifier 'FAIL' that 
implements '" +
+                               DynamicTableSourceFactory.class.getName() + "' 
in the classpath.\n\n" +
+                       "Available factory identifiers are:\n\n" +
+                       "test-connector");
+               testError(options -> options.put("connector", "FAIL"));
+       }
+
+       @Test
+       public void testMissingConnectorOption() {
+               expectError(
+                       "One or more required options are missing.\n\n" +
+                       "Missing required options are:\n\n" +
+                       "target");
+               testError(options -> options.remove("target"));
+       }
+
+       @Test
+       public void testInvalidConnectorOption() {
+               expectError("Invalid value for option 'buffer-size'.");
+               testError(options -> options.put("buffer-size", "FAIL"));
+       }
+
+       @Test
+       public void testMissingFormat() {
+               expectError("Could not find required scan format 
'value.format.kind'.");
+               testError(options -> options.remove("value.format.kind"));
+       }
+
+       @Test
+       public void testInvalidFormat() {
+               expectError(
+                       "Could not find any factory for identifier 'FAIL' that 
implements '" +
+                               DeserializationFormatFactory.class.getName() + 
"' in the classpath.\n\n" +
+                       "Available factory identifiers are:\n\n" +
+                       "test-format");
+               testError(options -> options.put("value.format.kind", "FAIL"));
+       }
+
+       @Test
+       public void testMissingFormatOption() {
+               expectError(
+                       "Error creating scan format 'test-format' in option 
space 'key.format.'.");
+               expectError(
+                       "One or more required options are missing.\n\n" +
+                       "Missing required options are:\n\n" +
+                       "delimiter");
+               testError(options -> options.remove("key.format.delimiter"));
+       }
+
+       @Test
+       public void testInvalidFormatOption() {
+               expectError("Invalid value for option 'fail-on-missing'.");
+               testError(options -> options.put("key.format.fail-on-missing", 
"FAIL"));
+       }
+
+       @Test
+       public void testUnconsumedOption() {
+               expectError(
+                       "Unsupported options found for connector 
'test-connector'.\n\n" +
+                       "Unsupported options:\n\n" +
+                       "this-is-also-not-consumed\n" +
+                       "this-is-not-consumed\n\n" +
+                       "Supported options:\n\n" +
+                       "buffer-size\n" +
+                       "connector\n" +
+                       "key.format.delimiter\n" +
+                       "key.format.fail-on-missing\n" +
+                       "key.format.kind\n" +
+                       "property-version\n" +
+                       "target\n" +
+                       "value.format.delimiter\n" +
+                       "value.format.fail-on-missing\n" +
+                       "value.format.kind");
+               testError(options -> {
+                       options.put("this-is-not-consumed", "42");
+                       options.put("this-is-also-not-consumed", "true");
+               });
+       }
+
+       @Test
+       public void testAllOptions() {
+               final Map<String, String> options = createAllOptions();
+               final DynamicTableSource actualSource = 
createTableSource(options);
+               final DynamicTableSource expectedSource = new 
DynamicTableSourceMock(
+                       "MyTarget",
+                       new ScanFormatMock(",", false),
+                       new ScanFormatMock("|", true));
+               assertEquals(expectedSource, actualSource);
+               final DynamicTableSink actualSink = createTableSink(options);
+               final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+                       "MyTarget",
+                       1000L,
+                       new SinkFormatMock(","),
+                       new SinkFormatMock("|"));
+               assertEquals(expectedSink, actualSink);
+       }
+
+       @Test
+       public void testOptionalFormat() {
+               final Map<String, String> options = createAllOptions();
+               options.remove("key.format.kind");
+               options.remove("key.format.delimiter");
+               final DynamicTableSource actualSource = 
createTableSource(options);
+               final DynamicTableSource expectedSource = new 
DynamicTableSourceMock(
+                       "MyTarget",
+                       null,
+                       new ScanFormatMock("|", true));
+               assertEquals(expectedSource, actualSource);
+               final DynamicTableSink actualSink = createTableSink(options);
+               final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+                       "MyTarget",
+                       1000L,
+                       null,
+                       new SinkFormatMock("|"));
+               assertEquals(expectedSink, actualSink);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private void expectError(String message) {
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new ValidationException(message)));
+       }
+
+       private static void testError(Consumer<Map<String, String>> 
optionModifier) {
+               final Map<String, String> options = createAllOptions();
+               optionModifier.accept(options);
+               createTableSource(options);
+       }
+
+       private static Map<String, String> createAllOptions() {
+               final Map<String, String> options = new HashMap<>();
+               // we use strings here to test realistic parsing
+               options.put("property-version", "1");
+               options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+               options.put("target", "MyTarget");
+               options.put("buffer-size", "1000");
+               options.put("key.format.kind", TestFormatFactory.IDENTIFIER);
+               options.put("key.format.delimiter", ",");
+               options.put("value.format.kind", TestFormatFactory.IDENTIFIER);
+               options.put("value.format.delimiter", "|");
+               options.put("value.format.fail-on-missing", "true");
+               return options;
+       }
+
+       private static DynamicTableSource createTableSource(Map<String, String> 
options) {
+               return FactoryUtil.createTableSource(
+                       null,
+                       ObjectIdentifier.of("cat", "db", "table"),
+                       new CatalogTableMock(options),
+                       new Configuration(),
+                       FactoryUtilTest.class.getClassLoader());
+       }
+
+       private static DynamicTableSink createTableSink(Map<String, String> 
options) {
+               return FactoryUtil.createTableSink(
+                       null,
+                       ObjectIdentifier.of("cat", "db", "table"),
+                       new CatalogTableMock(options),
+                       new Configuration(),
+                       FactoryUtilTest.class.getClassLoader());
+       }
+
+       private static class CatalogTableMock implements CatalogTable {
+
+               final Map<String, String> options;
+
+               CatalogTableMock(Map<String, String> options) {
+                       this.options = options;
+               }
+
+               @Override
+               public boolean isPartitioned() {
+                       return false;
+               }
+
+               @Override
+               public List<String> getPartitionKeys() {
+                       return null;
+               }
+
+               @Override
+               public CatalogTable copy(Map<String, String> options) {
+                       return null;
+               }
+
+               @Override
+               public Map<String, String> toProperties() {
+                       return null;
+               }
+
+               @Override
+               public Map<String, String> getProperties() {
+                       return options;
+               }
+
+               @Override
+               public TableSchema getSchema() {
+                       return null;
+               }
+
+               @Override
+               public String getComment() {
+                       return null;
+               }
+
+               @Override
+               public CatalogBaseTable copy() {
+                       return null;
+               }
+
+               @Override
+               public Optional<String> getDescription() {
+                       return Optional.empty();
+               }
+
+               @Override
+               public Optional<String> getDetailedDescription() {
+                       return Optional.empty();
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
new file mode 100644
index 0000000..ff5a21f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Test implementations for {@link DynamicTableSourceFactory} and {@link 
DynamicTableSinkFactory}.
+ */
+public final class TestDynamicTableFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+       public static final String IDENTIFIER = "test-connector";
+
+       public static final ConfigOption<String> TARGET = ConfigOptions
+               .key("target")
+               .stringType()
+               .noDefaultValue();
+
+       public static final ConfigOption<Long> BUFFER_SIZE = ConfigOptions
+               .key("buffer-size")
+               .longType()
+               .defaultValue(100L);
+
+       public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
+               .key("key.format.kind")
+               .stringType()
+               .noDefaultValue();
+
+       public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions
+               .key("value.format.kind")
+               .stringType()
+               .noDefaultValue();
+
+       @Override
+       public DynamicTableSource createDynamicTableSource(Context context) {
+               final TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+               final Optional<ScanFormat<DeserializationSchema<RowData>>> 
keyFormat = helper.discoverOptionalScanFormat(
+                       DeserializationFormatFactory.class,
+                       KEY_FORMAT,
+                       FactoryUtil.KEY_FORMAT_PREFIX);
+               final ScanFormat<DeserializationSchema<RowData>> valueFormat = 
helper.discoverScanFormat(
+                       DeserializationFormatFactory.class,
+                       VALUE_FORMAT,
+                       FactoryUtil.VALUE_FORMAT_PREFIX);
+               helper.validate();
+
+               return new DynamicTableSourceMock(
+                       helper.getOptions().get(TARGET),
+                       keyFormat.orElse(null),
+                       valueFormat);
+       }
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               final TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+               final Optional<SinkFormat<SerializationSchema<RowData>>> 
keyFormat = helper.discoverOptionalSinkFormat(
+                       SerializationFormatFactory.class,
+                       KEY_FORMAT,
+                       FactoryUtil.KEY_FORMAT_PREFIX);
+               final SinkFormat<SerializationSchema<RowData>> valueFormat = 
helper.discoverSinkFormat(
+                       SerializationFormatFactory.class,
+                       VALUE_FORMAT,
+                       FactoryUtil.VALUE_FORMAT_PREFIX);
+               helper.validate();
+
+               return new DynamicTableSinkMock(
+                       helper.getOptions().get(TARGET),
+                       helper.getOptions().get(BUFFER_SIZE),
+                       keyFormat.orElse(null),
+                       valueFormat);
+       }
+
+       @Override
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(TARGET);
+               options.add(VALUE_FORMAT);
+               return options;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(BUFFER_SIZE);
+               options.add(KEY_FORMAT);
+               return options;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Table source
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * {@link DynamicTableSource} for testing.
+        */
+       public static class DynamicTableSourceMock implements ScanTableSource {
+
+               public final String target;
+               public final @Nullable 
ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat;
+               public final ScanFormat<DeserializationSchema<RowData>> 
sourceValueFormat;
+
+               DynamicTableSourceMock(
+                               String target,
+                               @Nullable 
ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat,
+                               ScanFormat<DeserializationSchema<RowData>> 
sourceValueFormat) {
+                       this.target = target;
+                       this.sourceKeyFormat = sourceKeyFormat;
+                       this.sourceValueFormat = sourceValueFormat;
+               }
+
+               @Override
+               public ChangelogMode getChangelogMode() {
+                       return null;
+               }
+
+               @Override
+               public ScanRuntimeProvider getScanRuntimeProvider(Context 
runtimeProviderContext) {
+                       return null;
+               }
+
+               @Override
+               public DynamicTableSource copy() {
+                       return null;
+               }
+
+               @Override
+               public String asSummaryString() {
+                       return null;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       DynamicTableSourceMock that = (DynamicTableSourceMock) 
o;
+                       return target.equals(that.target) &&
+                               Objects.equals(sourceKeyFormat, 
that.sourceKeyFormat) &&
+                               
sourceValueFormat.equals(that.sourceValueFormat);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(target, sourceKeyFormat, 
sourceValueFormat);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Table sink
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * {@link DynamicTableSink} for testing.
+        */
+       public static class DynamicTableSinkMock implements DynamicTableSink {
+
+               public final String target;
+               public final Long bufferSize;
+               public final @Nullable SinkFormat<SerializationSchema<RowData>> 
sinkKeyFormat;
+               public final SinkFormat<SerializationSchema<RowData>> 
sinkValueFormat;
+
+               DynamicTableSinkMock(
+                               String target,
+                               Long bufferSize,
+                               @Nullable 
SinkFormat<SerializationSchema<RowData>> sinkKeyFormat,
+                               SinkFormat<SerializationSchema<RowData>> 
sinkValueFormat) {
+                       this.target = target;
+                       this.bufferSize = bufferSize;
+                       this.sinkKeyFormat = sinkKeyFormat;
+                       this.sinkValueFormat = sinkValueFormat;
+               }
+
+               @Override
+               public ChangelogMode getChangelogMode(ChangelogMode 
requestedMode) {
+                       return null;
+               }
+
+               @Override
+               public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
+                       return null;
+               }
+
+               @Override
+               public DynamicTableSink copy() {
+                       return null;
+               }
+
+               @Override
+               public String asSummaryString() {
+                       return null;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       DynamicTableSinkMock that = (DynamicTableSinkMock) o;
+                       return target.equals(that.target) &&
+                               bufferSize.equals(that.bufferSize) &&
+                               Objects.equals(sinkKeyFormat, 
that.sinkKeyFormat) &&
+                               sinkValueFormat.equals(that.sinkValueFormat);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(target, bufferSize, sinkKeyFormat, 
sinkValueFormat);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
new file mode 100644
index 0000000..c04bdb6
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Tests implementations for {@link DeserializationFormatFactory} and {@link 
SerializationFormatFactory}.
+ */
+public class TestFormatFactory implements DeserializationFormatFactory, 
SerializationFormatFactory {
+
+       public static final String IDENTIFIER = "test-format";
+
+       public static final ConfigOption<String> DELIMITER = ConfigOptions
+               .key("delimiter")
+               .stringType()
+               .noDefaultValue();
+
+       public static final ConfigOption<Boolean> FAIL_ON_MISSING = 
ConfigOptions
+               .key("fail-on-missing")
+               .booleanType()
+               .defaultValue(false);
+
+       @Override
+       public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+                       DynamicTableFactory.Context context,
+                       ReadableConfig formatConfig) {
+               FactoryUtil.validateFactoryOptions(this, formatConfig);
+               return new ScanFormatMock(formatConfig.get(DELIMITER), 
formatConfig.get(FAIL_ON_MISSING));
+       }
+
+       @Override
+       public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+                       DynamicTableFactory.Context context,
+                       ReadableConfig formatConfig) {
+               FactoryUtil.validateFactoryOptions(this, formatConfig);
+               return new SinkFormatMock(formatConfig.get(DELIMITER));
+       }
+
+       @Override
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(DELIMITER);
+               return options;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(FAIL_ON_MISSING);
+               return options;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Table source format
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * {@link ScanFormat} for testing.
+        */
+       public static class ScanFormatMock implements 
ScanFormat<DeserializationSchema<RowData>> {
+
+               public final String delimiter;
+               public final Boolean failOnMissing;
+
+               ScanFormatMock(String delimiter, Boolean failOnMissing) {
+                       this.delimiter = delimiter;
+                       this.failOnMissing = failOnMissing;
+               }
+
+               @Override
+               public DeserializationSchema<RowData> createScanFormat(
+                               ScanTableSource.Context context,
+                               DataType producedDataType) {
+                       return null;
+               }
+
+               @Override
+               public ChangelogMode getChangelogMode() {
+                       return null;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       ScanFormatMock that = (ScanFormatMock) o;
+                       return delimiter.equals(that.delimiter) && 
failOnMissing.equals(that.failOnMissing);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(delimiter, failOnMissing);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Table sink format
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * {@link SinkFormat} for testing.
+        */
+       public static class SinkFormatMock implements 
SinkFormat<SerializationSchema<RowData>> {
+
+               public final String delimiter;
+
+               SinkFormatMock(String delimiter) {
+                       this.delimiter = delimiter;
+               }
+
+               @Override
+               public SerializationSchema<RowData> createSinkFormat(
+                               ScanTableSource.Context context,
+                               DataType consumeDataType) {
+                       return null;
+               }
+
+               @Override
+               public ChangelogMode getChangelogMode() {
+                       return null;
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       SinkFormatMock that = (SinkFormatMock) o;
+                       return delimiter.equals(that.delimiter);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(delimiter);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..d31007a
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.TestDynamicTableFactory
+org.apache.flink.table.factories.TestFormatFactory

Reply via email to