aljoscha commented on a change in pull request #12724:
URL: https://github.com/apache/flink/pull/12724#discussion_r443514976



##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared

Review comment:
       ```suggestion
   operation. Connector-specific dependencies don't have to be present in the 
classpath yet. The options declared
   ```
   _must_ says that it would be an error if they are present.

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in
+a `INSERT INTO` statements).
+
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide 
connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and 
`DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = 
'5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized 
instance of the table
+connector.
+
+By default, instances of `DynamicTableSourceFactory` and 
`DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must 
correspond to a valid factory
+identifier.
+
+Although it might not be apparent in the class naming, `DynamicTableSource` 
and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete 
runtime implementation for reading/writing
+the actual data.
+
+The planner uses the source and sink instances to perform connector-specific 
bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally 
declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply 
changes to an instance and
+thus mutate the produced runtime implementation.
+
+### Runtime
+
+Once the logical planning is complete, the planner will obtain the _runtime 
implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces 
such as `InputFormat` or `SourceFunction`.
+
+Flink offers different core interfaces that are currently being unified. 
Therefore, the different runtime

Review comment:
       I would leave out that they are unified. This text will become outdated 
and we will forget to change it.

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.

Review comment:
       ```suggestion
   create specialized connectors.
   ```

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in
+a `INSERT INTO` statements).
+
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide 
connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and 
`DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = 
'5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized 
instance of the table
+connector.
+
+By default, instances of `DynamicTableSourceFactory` and 
`DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must 
correspond to a valid factory
+identifier.
+
+Although it might not be apparent in the class naming, `DynamicTableSource` 
and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete 
runtime implementation for reading/writing
+the actual data.
+
+The planner uses the source and sink instances to perform connector-specific 
bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally 
declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply 
changes to an instance and
+thus mutate the produced runtime implementation.
+
+### Runtime
+
+Once the logical planning is complete, the planner will obtain the _runtime 
implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces 
such as `InputFormat` or `SourceFunction`.
+
+Flink offers different core interfaces that are currently being unified. 
Therefore, the different runtime
+interfaces are grouped by another level of abstraction as subclasses of 
`ScanRuntimeProvider`, `LookupRuntimeProvider`,
+and `SinkRuntimeProvider`.
+
+For example, both `OutputFormatProvider` (providing 
`org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` 
(providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are 
concrete instances of `SinkRuntimeProvider`.
+
+{% top %}
+
+Extension Points
+----------------
+
+This section explains the available interfaces for extending Flink's table 
connectors.
+
+### Dynamic Table Factories
+
+Dynamic table factories configuring a dynamic table connector for an external 
storage system from catalog

Review comment:
       ```suggestion
   Dynamic table factories are used to configure a dynamic table connector for 
an external storage system from catalog
   ```

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in
+a `INSERT INTO` statements).
+
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide 
connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and 
`DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = 
'5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized 
instance of the table
+connector.
+
+By default, instances of `DynamicTableSourceFactory` and 
`DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must 
correspond to a valid factory
+identifier.
+
+Although it might not be apparent in the class naming, `DynamicTableSource` 
and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete 
runtime implementation for reading/writing
+the actual data.
+
+The planner uses the source and sink instances to perform connector-specific 
bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally 
declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply 
changes to an instance and
+thus mutate the produced runtime implementation.
+
+### Runtime
+
+Once the logical planning is complete, the planner will obtain the _runtime 
implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces 
such as `InputFormat` or `SourceFunction`.
+
+Flink offers different core interfaces that are currently being unified. 
Therefore, the different runtime
+interfaces are grouped by another level of abstraction as subclasses of 
`ScanRuntimeProvider`, `LookupRuntimeProvider`,
+and `SinkRuntimeProvider`.
+
+For example, both `OutputFormatProvider` (providing 
`org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` 
(providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are 
concrete instances of `SinkRuntimeProvider`.
+
+{% top %}
+
+Extension Points
+----------------
+
+This section explains the available interfaces for extending Flink's table 
connectors.
+
+### Dynamic Table Factories
+
+Dynamic table factories configuring a dynamic table connector for an external 
storage system from catalog
+and session information.
+
+`org.apache.flink.table.factories.DynamicTableSourceFactory` can be 
implemented to construct a `DynamicTableSource`.
+
+`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented 
to construct a `DynamicTableSink`.
+
+By default, the factory is discovered using the value of the `connector` 
option as the factory identifier
+and Java's Service Provider Interface.
+
+In JAR files, references to new implementations can be added to the service 
file:
+
+`META-INF/services/org.apache.flink.table.factories.Factory`
+
+The framework will check for a single matching factory that is uniquely 
identified by factory identifier
+and requested base class (e.g. `DynamicTableSourceFactory`).
+
+The factory discovery process can be bypassed by the catalog implementation if 
necessary. For this, a
+catalog needs to return an instance that implements the requested base class 
in `org.apache.flink.table.catalog.Catalog#getFactory`.
+
+### Dynamic Table Source
+
+By definition, a dynamic table can change over time.
+
+When reading a dynamic table, the content can either be considered as:
+- A changelog (finite or infinite) for which all changes are consumed 
continuously until the changelog
+  is exhausted. Represented in the `ScanTableSource` interface.
+- A continuously changing or very large external table whose content is 
usually never read entirely
+  but queried for individual values when necessary. Represented in the 
`LookupTableSource` interface.
+
+Both interfaces can be implemented at the same time. The planner decides about 
their usage depending
+on the specified query.
+
+#### Scan Table Source
+
+A `ScanTableSource` scans all rows from an external storage system during 
runtime.
+
+The scanned rows don't have to contain only insertions but can also contain 
updates and deletions. Thus,
+the table source can be used to read a (finite or infinite) changelog. The 
returned _changelog mode_ indicates
+the set of changes that the planner can expect during runtime.
+
+For regular batch scenarios, the source can emit a bounded stream of 
insert-only rows.
+
+For regular streaming scenarios, the source can emit an unbounded stream of 
insert-only rows.
+
+For change data capture (CDC) scenarios, the source can emit bounded or 
unbounded streams with insert,
+update, and delete rows.
+
+A table source can implement further abilitiy interfaces such as 
`SupportsProjectionPushDown` that might
+mutate an instance during planning. All abilities are listed in the 
`org.apache.flink.table.connector.source.abilities`
+package and in the documentation of 
`org.apache.flink.table.connector.source.ScanTableSource`.
+
+The runtime implementation of a `ScanTableSource` must produce internal data 
structures. Thus, records
+must be emitted as `org.apache.flink.table.data.RowData`. The framework 
provides runtime converters such
+that a source can still work on common data structures and perform a 
conversion at the end.
+
+#### Lookup Table Source
+
+A `LookupTableSource` looks up rows of an external storage system by one or 
more keys during runtime.
+
+Compared to `ScanTableSource`, the source must not read the entire table and 
can lazily fetch individual
+values from a (possibly continuously changing) external table when necessary.
+
+Compared to `ScanTableSource`, a `LookupTableSource` does only support 
emitting insert-only changes currently.
+
+Further abilities are not supported. See the documentation of 
`org.apache.flink.table.connector.source.LookupTableSource`
+for more information.
+
+The runtime implementation of a `LookupTableSource` is a `TableFunction` or 
`AsyncTableFunction`. The function
+will be called with values for the given lookup keys during runtime.
+
+### Dynamic Table Sink
+
+By definition, a dynamic table can change over time.
+
+When writing a dynamic table, the content can either be considered as a 
changelog (finite or infinite)

Review comment:
       the other half of the _either_ seems to be missing: "the content can 
either be ... *or* ..."

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in

Review comment:
       in `SELECT` queries *OR* in a `SELECT` query

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in
+a `INSERT INTO` statements).
+
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide 
connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and 
`DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = 
'5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized 
instance of the table
+connector.
+
+By default, instances of `DynamicTableSourceFactory` and 
`DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must 
correspond to a valid factory
+identifier.
+
+Although it might not be apparent in the class naming, `DynamicTableSource` 
and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete 
runtime implementation for reading/writing
+the actual data.
+
+The planner uses the source and sink instances to perform connector-specific 
bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally 
declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply 
changes to an instance and
+thus mutate the produced runtime implementation.
+
+### Runtime
+
+Once the logical planning is complete, the planner will obtain the _runtime 
implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces 
such as `InputFormat` or `SourceFunction`.
+
+Flink offers different core interfaces that are currently being unified. 
Therefore, the different runtime
+interfaces are grouped by another level of abstraction as subclasses of 
`ScanRuntimeProvider`, `LookupRuntimeProvider`,
+and `SinkRuntimeProvider`.
+
+For example, both `OutputFormatProvider` (providing 
`org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` 
(providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are 
concrete instances of `SinkRuntimeProvider`.
+
+{% top %}
+
+Extension Points
+----------------
+
+This section explains the available interfaces for extending Flink's table 
connectors.
+
+### Dynamic Table Factories
+
+Dynamic table factories configuring a dynamic table connector for an external 
storage system from catalog
+and session information.
+
+`org.apache.flink.table.factories.DynamicTableSourceFactory` can be 
implemented to construct a `DynamicTableSource`.
+
+`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented 
to construct a `DynamicTableSink`.
+
+By default, the factory is discovered using the value of the `connector` 
option as the factory identifier
+and Java's Service Provider Interface.
+
+In JAR files, references to new implementations can be added to the service 
file:
+
+`META-INF/services/org.apache.flink.table.factories.Factory`
+
+The framework will check for a single matching factory that is uniquely 
identified by factory identifier
+and requested base class (e.g. `DynamicTableSourceFactory`).
+
+The factory discovery process can be bypassed by the catalog implementation if 
necessary. For this, a
+catalog needs to return an instance that implements the requested base class 
in `org.apache.flink.table.catalog.Catalog#getFactory`.
+
+### Dynamic Table Source
+
+By definition, a dynamic table can change over time.
+
+When reading a dynamic table, the content can either be considered as:
+- A changelog (finite or infinite) for which all changes are consumed 
continuously until the changelog
+  is exhausted. Represented in the `ScanTableSource` interface.
+- A continuously changing or very large external table whose content is 
usually never read entirely
+  but queried for individual values when necessary. Represented in the 
`LookupTableSource` interface.

Review comment:
       ```suggestion
     but queried for individual values when necessary. This is represented by 
the `LookupTableSource` interface.
   ```

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in
+a `INSERT INTO` statements).
+
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide 
connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and 
`DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = 
'5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized 
instance of the table
+connector.
+
+By default, instances of `DynamicTableSourceFactory` and 
`DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must 
correspond to a valid factory
+identifier.
+
+Although it might not be apparent in the class naming, `DynamicTableSource` 
and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete 
runtime implementation for reading/writing
+the actual data.
+
+The planner uses the source and sink instances to perform connector-specific 
bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally 
declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply 
changes to an instance and
+thus mutate the produced runtime implementation.
+
+### Runtime
+
+Once the logical planning is complete, the planner will obtain the _runtime 
implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces 
such as `InputFormat` or `SourceFunction`.
+
+Flink offers different core interfaces that are currently being unified. 
Therefore, the different runtime
+interfaces are grouped by another level of abstraction as subclasses of 
`ScanRuntimeProvider`, `LookupRuntimeProvider`,
+and `SinkRuntimeProvider`.
+
+For example, both `OutputFormatProvider` (providing 
`org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` 
(providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are 
concrete instances of `SinkRuntimeProvider`.
+
+{% top %}
+
+Extension Points
+----------------
+
+This section explains the available interfaces for extending Flink's table 
connectors.
+
+### Dynamic Table Factories
+
+Dynamic table factories configuring a dynamic table connector for an external 
storage system from catalog
+and session information.
+
+`org.apache.flink.table.factories.DynamicTableSourceFactory` can be 
implemented to construct a `DynamicTableSource`.
+
+`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented 
to construct a `DynamicTableSink`.
+
+By default, the factory is discovered using the value of the `connector` 
option as the factory identifier
+and Java's Service Provider Interface.
+
+In JAR files, references to new implementations can be added to the service 
file:
+
+`META-INF/services/org.apache.flink.table.factories.Factory`
+
+The framework will check for a single matching factory that is uniquely 
identified by factory identifier
+and requested base class (e.g. `DynamicTableSourceFactory`).
+
+The factory discovery process can be bypassed by the catalog implementation if 
necessary. For this, a
+catalog needs to return an instance that implements the requested base class 
in `org.apache.flink.table.catalog.Catalog#getFactory`.
+
+### Dynamic Table Source
+
+By definition, a dynamic table can change over time.
+
+When reading a dynamic table, the content can either be considered as:
+- A changelog (finite or infinite) for which all changes are consumed 
continuously until the changelog
+  is exhausted. Represented in the `ScanTableSource` interface.

Review comment:
       ```suggestion
     is exhausted. This is represented by the `ScanTableSource` interface.
   ```

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in
+a `INSERT INTO` statements).
+
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide 
connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and 
`DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = 
'5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized 
instance of the table
+connector.
+
+By default, instances of `DynamicTableSourceFactory` and 
`DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must 
correspond to a valid factory
+identifier.
+
+Although it might not be apparent in the class naming, `DynamicTableSource` 
and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete 
runtime implementation for reading/writing
+the actual data.
+
+The planner uses the source and sink instances to perform connector-specific 
bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally 
declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply 
changes to an instance and
+thus mutate the produced runtime implementation.
+
+### Runtime
+
+Once the logical planning is complete, the planner will obtain the _runtime 
implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces 
such as `InputFormat` or `SourceFunction`.
+
+Flink offers different core interfaces that are currently being unified. 
Therefore, the different runtime
+interfaces are grouped by another level of abstraction as subclasses of 
`ScanRuntimeProvider`, `LookupRuntimeProvider`,
+and `SinkRuntimeProvider`.
+
+For example, both `OutputFormatProvider` (providing 
`org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` 
(providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are 
concrete instances of `SinkRuntimeProvider`.
+
+{% top %}
+
+Extension Points
+----------------
+
+This section explains the available interfaces for extending Flink's table 
connectors.
+
+### Dynamic Table Factories
+
+Dynamic table factories configuring a dynamic table connector for an external 
storage system from catalog
+and session information.
+
+`org.apache.flink.table.factories.DynamicTableSourceFactory` can be 
implemented to construct a `DynamicTableSource`.
+
+`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented 
to construct a `DynamicTableSink`.
+
+By default, the factory is discovered using the value of the `connector` 
option as the factory identifier
+and Java's Service Provider Interface.
+
+In JAR files, references to new implementations can be added to the service 
file:
+
+`META-INF/services/org.apache.flink.table.factories.Factory`
+
+The framework will check for a single matching factory that is uniquely 
identified by factory identifier
+and requested base class (e.g. `DynamicTableSourceFactory`).
+
+The factory discovery process can be bypassed by the catalog implementation if 
necessary. For this, a
+catalog needs to return an instance that implements the requested base class 
in `org.apache.flink.table.catalog.Catalog#getFactory`.
+
+### Dynamic Table Source
+
+By definition, a dynamic table can change over time.
+
+When reading a dynamic table, the content can either be considered as:
+- A changelog (finite or infinite) for which all changes are consumed 
continuously until the changelog
+  is exhausted. Represented in the `ScanTableSource` interface.
+- A continuously changing or very large external table whose content is 
usually never read entirely
+  but queried for individual values when necessary. Represented in the 
`LookupTableSource` interface.
+
+Both interfaces can be implemented at the same time. The planner decides about 
their usage depending

Review comment:
       ```suggestion
   A class can implement both of these interfaces at the same time. The planner 
decides about their usage depending
   ```

##########
File path: docs/dev/table/sourceSinks.md
##########
@@ -0,0 +1,740 @@
+---
+title: "User-defined Sources & Sinks"
+nav-parent_id: tableapi
+nav-pos: 130
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+_Dynamic tables_ are the core concept of Flink's Table & SQL API for 
processing both bounded and unbounded
+data in a unified fashion.
+
+Because dynamic tables are only a logical concept, Flink does not own the data 
itself. Instead, the content
+of a dynamic table is stored in external systems (such as databases, key-value 
stores, message queues) or files.
+
+_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from 
and to an external system. In
+the documentation, sources and sinks are often summarized under the term 
_connector_.
+
+Flink provides pre-defined connectors for Kafka, Hive, and different file 
systems. See the [connector section]({% link dev/table/connectors/index.md %})
+for more information about built-in table sources and sinks.
+
+This page focuses on how to develop a custom, user-defined connector.
+
+<span class="label label-danger">Attention</span> New table source and table 
sink interfaces have been
+introduced in Flink 1.11 as part of 
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
+Also the factory interfaces have been reworked. FLIP-95 is not fully 
implemented yet. Many ability interfaces
+are not supported yet (e.g. for filter or partition push down). If necessary, 
please also have a look
+at the [old table sources and sinks page]({% link 
dev/table/legacySourceSinks.md %}). Those interfaces
+are still supported for backwards compatibility.
+
+* This will be replaced by the TOC
+{:toc}
+
+Overview
+--------
+
+In many cases, implementers don't need to create a new connector from scratch 
but would like to slightly
+modify existing connectors or hook into the existing stack. In other cases, 
implementers would like to
+create specialed connectors.
+
+This section helps for both kinds of use cases. It explains the general 
architecture of table connectors
+from pure declaration in the API to runtime code that will be executed on the 
cluster.
+
+The filled arrows show how objects are transformed to other objects from one 
stage to the next stage during
+the translation process.
+
+<div style="text-align: center">
+  <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation 
of table connectors" />
+</div>
+
+### Metadata
+
+Both Table API and SQL are declarative APIs. This includes the declaration of 
tables. Thus, executing
+a `CREATE TABLE` statement results in updated metadata in the target catalog.
+
+For most catalog implementations, physical data in the external system is not 
modified for such an
+operation. Connector-specific dependencies must not be present in the 
classpath yet. The options declared
+in the `WITH` clause are neither validated nor otherwise interpreted.
+
+The metadata for dynamic tables (created via DDL or provided by the catalog) 
is represented as instances
+of `CatalogTable`. A table name will be resolved into a `CatalogTable` 
internally when necessary.
+
+### Planning
+
+When it comes to planning and optimization of the table program, a 
`CatalogTable` needs to be resolved
+into a `DynamicTableSource` (for reading in a `SELECT` queries) and 
`DynamicTableSink` (for writing in
+a `INSERT INTO` statements).
+
+`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide 
connector-specific logic for translating
+the metadata of a `CatalogTable` into instances of `DynamicTableSource` and 
`DynamicTableSink`. In most
+of the cases, a factory's purpose is to validate options (such as `'port' = 
'5022'` in the example),
+configure encoding/decoding formats (if required), and create a parameterized 
instance of the table
+connector.
+
+By default, instances of `DynamicTableSourceFactory` and 
`DynamicTableSinkFactory` are discovered using
+Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The
+`connector` option (such as `'connector' = 'custom'` in the example) must 
correspond to a valid factory
+identifier.
+
+Although it might not be apparent in the class naming, `DynamicTableSource` 
and `DynamicTableSink`
+can also be seen as stateful factories that eventually produce concrete 
runtime implementation for reading/writing
+the actual data.
+
+The planner uses the source and sink instances to perform connector-specific 
bidirectional comunication
+until an optimal logical plan could be found. Depending on the optionally 
declared ability interfaces (e.g.
+`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply 
changes to an instance and
+thus mutate the produced runtime implementation.
+
+### Runtime
+
+Once the logical planning is complete, the planner will obtain the _runtime 
implementation_ from the table
+connector. Runtime logic is implemented in Flink's core connector interfaces 
such as `InputFormat` or `SourceFunction`.
+
+Flink offers different core interfaces that are currently being unified. 
Therefore, the different runtime
+interfaces are grouped by another level of abstraction as subclasses of 
`ScanRuntimeProvider`, `LookupRuntimeProvider`,
+and `SinkRuntimeProvider`.
+
+For example, both `OutputFormatProvider` (providing 
`org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` 
(providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are 
concrete instances of `SinkRuntimeProvider`.
+
+{% top %}
+
+Extension Points
+----------------
+
+This section explains the available interfaces for extending Flink's table 
connectors.
+
+### Dynamic Table Factories
+
+Dynamic table factories configuring a dynamic table connector for an external 
storage system from catalog
+and session information.
+
+`org.apache.flink.table.factories.DynamicTableSourceFactory` can be 
implemented to construct a `DynamicTableSource`.
+
+`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented 
to construct a `DynamicTableSink`.
+
+By default, the factory is discovered using the value of the `connector` 
option as the factory identifier
+and Java's Service Provider Interface.
+
+In JAR files, references to new implementations can be added to the service 
file:
+
+`META-INF/services/org.apache.flink.table.factories.Factory`
+
+The framework will check for a single matching factory that is uniquely 
identified by factory identifier
+and requested base class (e.g. `DynamicTableSourceFactory`).
+
+The factory discovery process can be bypassed by the catalog implementation if 
necessary. For this, a
+catalog needs to return an instance that implements the requested base class 
in `org.apache.flink.table.catalog.Catalog#getFactory`.
+
+### Dynamic Table Source
+
+By definition, a dynamic table can change over time.
+
+When reading a dynamic table, the content can either be considered as:
+- A changelog (finite or infinite) for which all changes are consumed 
continuously until the changelog
+  is exhausted. Represented in the `ScanTableSource` interface.
+- A continuously changing or very large external table whose content is 
usually never read entirely
+  but queried for individual values when necessary. Represented in the 
`LookupTableSource` interface.
+
+Both interfaces can be implemented at the same time. The planner decides about 
their usage depending
+on the specified query.
+
+#### Scan Table Source
+
+A `ScanTableSource` scans all rows from an external storage system during 
runtime.
+
+The scanned rows don't have to contain only insertions but can also contain 
updates and deletions. Thus,
+the table source can be used to read a (finite or infinite) changelog. The 
returned _changelog mode_ indicates
+the set of changes that the planner can expect during runtime.
+
+For regular batch scenarios, the source can emit a bounded stream of 
insert-only rows.
+
+For regular streaming scenarios, the source can emit an unbounded stream of 
insert-only rows.
+
+For change data capture (CDC) scenarios, the source can emit bounded or 
unbounded streams with insert,
+update, and delete rows.
+
+A table source can implement further abilitiy interfaces such as 
`SupportsProjectionPushDown` that might
+mutate an instance during planning. All abilities are listed in the 
`org.apache.flink.table.connector.source.abilities`
+package and in the documentation of 
`org.apache.flink.table.connector.source.ScanTableSource`.
+
+The runtime implementation of a `ScanTableSource` must produce internal data 
structures. Thus, records
+must be emitted as `org.apache.flink.table.data.RowData`. The framework 
provides runtime converters such
+that a source can still work on common data structures and perform a 
conversion at the end.
+
+#### Lookup Table Source
+
+A `LookupTableSource` looks up rows of an external storage system by one or 
more keys during runtime.
+
+Compared to `ScanTableSource`, the source must not read the entire table and 
can lazily fetch individual

Review comment:
       ```suggestion
   Compared to `ScanTableSource`, the source does not have to read the entire 
table and can lazily fetch individual
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to