aljoscha commented on a change in pull request #12724: URL: https://github.com/apache/flink/pull/12724#discussion_r443514976
########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared Review comment: ```suggestion operation. Connector-specific dependencies don't have to be present in the classpath yet. The options declared ``` _must_ says that it would be an error if they are present. ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in +a `INSERT INTO` statements). + +`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating +the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most +of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example), +configure encoding/decoding formats (if required), and create a parameterized instance of the table +connector. + +By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using +Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The +`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory +identifier. + +Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink` +can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing +the actual data. + +The planner uses the source and sink instances to perform connector-specific bidirectional comunication +until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. +`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and +thus mutate the produced runtime implementation. + +### Runtime + +Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table +connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`. + +Flink offers different core interfaces that are currently being unified. Therefore, the different runtime Review comment: I would leave out that they are unified. This text will become outdated and we will forget to change it. ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. Review comment: ```suggestion create specialized connectors. ``` ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in +a `INSERT INTO` statements). + +`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating +the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most +of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example), +configure encoding/decoding formats (if required), and create a parameterized instance of the table +connector. + +By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using +Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The +`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory +identifier. + +Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink` +can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing +the actual data. + +The planner uses the source and sink instances to perform connector-specific bidirectional comunication +until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. +`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and +thus mutate the produced runtime implementation. + +### Runtime + +Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table +connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`. + +Flink offers different core interfaces that are currently being unified. Therefore, the different runtime +interfaces are grouped by another level of abstraction as subclasses of `ScanRuntimeProvider`, `LookupRuntimeProvider`, +and `SinkRuntimeProvider`. + +For example, both `OutputFormatProvider` (providing `org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` (providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are concrete instances of `SinkRuntimeProvider`. + +{% top %} + +Extension Points +---------------- + +This section explains the available interfaces for extending Flink's table connectors. + +### Dynamic Table Factories + +Dynamic table factories configuring a dynamic table connector for an external storage system from catalog Review comment: ```suggestion Dynamic table factories are used to configure a dynamic table connector for an external storage system from catalog ``` ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in +a `INSERT INTO` statements). + +`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating +the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most +of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example), +configure encoding/decoding formats (if required), and create a parameterized instance of the table +connector. + +By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using +Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The +`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory +identifier. + +Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink` +can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing +the actual data. + +The planner uses the source and sink instances to perform connector-specific bidirectional comunication +until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. +`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and +thus mutate the produced runtime implementation. + +### Runtime + +Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table +connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`. + +Flink offers different core interfaces that are currently being unified. Therefore, the different runtime +interfaces are grouped by another level of abstraction as subclasses of `ScanRuntimeProvider`, `LookupRuntimeProvider`, +and `SinkRuntimeProvider`. + +For example, both `OutputFormatProvider` (providing `org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` (providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are concrete instances of `SinkRuntimeProvider`. + +{% top %} + +Extension Points +---------------- + +This section explains the available interfaces for extending Flink's table connectors. + +### Dynamic Table Factories + +Dynamic table factories configuring a dynamic table connector for an external storage system from catalog +and session information. + +`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`. + +`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`. + +By default, the factory is discovered using the value of the `connector` option as the factory identifier +and Java's Service Provider Interface. + +In JAR files, references to new implementations can be added to the service file: + +`META-INF/services/org.apache.flink.table.factories.Factory` + +The framework will check for a single matching factory that is uniquely identified by factory identifier +and requested base class (e.g. `DynamicTableSourceFactory`). + +The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a +catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`. + +### Dynamic Table Source + +By definition, a dynamic table can change over time. + +When reading a dynamic table, the content can either be considered as: +- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog + is exhausted. Represented in the `ScanTableSource` interface. +- A continuously changing or very large external table whose content is usually never read entirely + but queried for individual values when necessary. Represented in the `LookupTableSource` interface. + +Both interfaces can be implemented at the same time. The planner decides about their usage depending +on the specified query. + +#### Scan Table Source + +A `ScanTableSource` scans all rows from an external storage system during runtime. + +The scanned rows don't have to contain only insertions but can also contain updates and deletions. Thus, +the table source can be used to read a (finite or infinite) changelog. The returned _changelog mode_ indicates +the set of changes that the planner can expect during runtime. + +For regular batch scenarios, the source can emit a bounded stream of insert-only rows. + +For regular streaming scenarios, the source can emit an unbounded stream of insert-only rows. + +For change data capture (CDC) scenarios, the source can emit bounded or unbounded streams with insert, +update, and delete rows. + +A table source can implement further abilitiy interfaces such as `SupportsProjectionPushDown` that might +mutate an instance during planning. All abilities are listed in the `org.apache.flink.table.connector.source.abilities` +package and in the documentation of `org.apache.flink.table.connector.source.ScanTableSource`. + +The runtime implementation of a `ScanTableSource` must produce internal data structures. Thus, records +must be emitted as `org.apache.flink.table.data.RowData`. The framework provides runtime converters such +that a source can still work on common data structures and perform a conversion at the end. + +#### Lookup Table Source + +A `LookupTableSource` looks up rows of an external storage system by one or more keys during runtime. + +Compared to `ScanTableSource`, the source must not read the entire table and can lazily fetch individual +values from a (possibly continuously changing) external table when necessary. + +Compared to `ScanTableSource`, a `LookupTableSource` does only support emitting insert-only changes currently. + +Further abilities are not supported. See the documentation of `org.apache.flink.table.connector.source.LookupTableSource` +for more information. + +The runtime implementation of a `LookupTableSource` is a `TableFunction` or `AsyncTableFunction`. The function +will be called with values for the given lookup keys during runtime. + +### Dynamic Table Sink + +By definition, a dynamic table can change over time. + +When writing a dynamic table, the content can either be considered as a changelog (finite or infinite) Review comment: the other half of the _either_ seems to be missing: "the content can either be ... *or* ..." ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in Review comment: in `SELECT` queries *OR* in a `SELECT` query ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in +a `INSERT INTO` statements). + +`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating +the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most +of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example), +configure encoding/decoding formats (if required), and create a parameterized instance of the table +connector. + +By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using +Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The +`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory +identifier. + +Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink` +can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing +the actual data. + +The planner uses the source and sink instances to perform connector-specific bidirectional comunication +until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. +`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and +thus mutate the produced runtime implementation. + +### Runtime + +Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table +connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`. + +Flink offers different core interfaces that are currently being unified. Therefore, the different runtime +interfaces are grouped by another level of abstraction as subclasses of `ScanRuntimeProvider`, `LookupRuntimeProvider`, +and `SinkRuntimeProvider`. + +For example, both `OutputFormatProvider` (providing `org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` (providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are concrete instances of `SinkRuntimeProvider`. + +{% top %} + +Extension Points +---------------- + +This section explains the available interfaces for extending Flink's table connectors. + +### Dynamic Table Factories + +Dynamic table factories configuring a dynamic table connector for an external storage system from catalog +and session information. + +`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`. + +`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`. + +By default, the factory is discovered using the value of the `connector` option as the factory identifier +and Java's Service Provider Interface. + +In JAR files, references to new implementations can be added to the service file: + +`META-INF/services/org.apache.flink.table.factories.Factory` + +The framework will check for a single matching factory that is uniquely identified by factory identifier +and requested base class (e.g. `DynamicTableSourceFactory`). + +The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a +catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`. + +### Dynamic Table Source + +By definition, a dynamic table can change over time. + +When reading a dynamic table, the content can either be considered as: +- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog + is exhausted. Represented in the `ScanTableSource` interface. +- A continuously changing or very large external table whose content is usually never read entirely + but queried for individual values when necessary. Represented in the `LookupTableSource` interface. Review comment: ```suggestion but queried for individual values when necessary. This is represented by the `LookupTableSource` interface. ``` ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in +a `INSERT INTO` statements). + +`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating +the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most +of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example), +configure encoding/decoding formats (if required), and create a parameterized instance of the table +connector. + +By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using +Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The +`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory +identifier. + +Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink` +can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing +the actual data. + +The planner uses the source and sink instances to perform connector-specific bidirectional comunication +until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. +`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and +thus mutate the produced runtime implementation. + +### Runtime + +Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table +connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`. + +Flink offers different core interfaces that are currently being unified. Therefore, the different runtime +interfaces are grouped by another level of abstraction as subclasses of `ScanRuntimeProvider`, `LookupRuntimeProvider`, +and `SinkRuntimeProvider`. + +For example, both `OutputFormatProvider` (providing `org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` (providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are concrete instances of `SinkRuntimeProvider`. + +{% top %} + +Extension Points +---------------- + +This section explains the available interfaces for extending Flink's table connectors. + +### Dynamic Table Factories + +Dynamic table factories configuring a dynamic table connector for an external storage system from catalog +and session information. + +`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`. + +`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`. + +By default, the factory is discovered using the value of the `connector` option as the factory identifier +and Java's Service Provider Interface. + +In JAR files, references to new implementations can be added to the service file: + +`META-INF/services/org.apache.flink.table.factories.Factory` + +The framework will check for a single matching factory that is uniquely identified by factory identifier +and requested base class (e.g. `DynamicTableSourceFactory`). + +The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a +catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`. + +### Dynamic Table Source + +By definition, a dynamic table can change over time. + +When reading a dynamic table, the content can either be considered as: +- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog + is exhausted. Represented in the `ScanTableSource` interface. Review comment: ```suggestion is exhausted. This is represented by the `ScanTableSource` interface. ``` ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in +a `INSERT INTO` statements). + +`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating +the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most +of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example), +configure encoding/decoding formats (if required), and create a parameterized instance of the table +connector. + +By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using +Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The +`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory +identifier. + +Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink` +can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing +the actual data. + +The planner uses the source and sink instances to perform connector-specific bidirectional comunication +until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. +`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and +thus mutate the produced runtime implementation. + +### Runtime + +Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table +connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`. + +Flink offers different core interfaces that are currently being unified. Therefore, the different runtime +interfaces are grouped by another level of abstraction as subclasses of `ScanRuntimeProvider`, `LookupRuntimeProvider`, +and `SinkRuntimeProvider`. + +For example, both `OutputFormatProvider` (providing `org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` (providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are concrete instances of `SinkRuntimeProvider`. + +{% top %} + +Extension Points +---------------- + +This section explains the available interfaces for extending Flink's table connectors. + +### Dynamic Table Factories + +Dynamic table factories configuring a dynamic table connector for an external storage system from catalog +and session information. + +`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`. + +`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`. + +By default, the factory is discovered using the value of the `connector` option as the factory identifier +and Java's Service Provider Interface. + +In JAR files, references to new implementations can be added to the service file: + +`META-INF/services/org.apache.flink.table.factories.Factory` + +The framework will check for a single matching factory that is uniquely identified by factory identifier +and requested base class (e.g. `DynamicTableSourceFactory`). + +The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a +catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`. + +### Dynamic Table Source + +By definition, a dynamic table can change over time. + +When reading a dynamic table, the content can either be considered as: +- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog + is exhausted. Represented in the `ScanTableSource` interface. +- A continuously changing or very large external table whose content is usually never read entirely + but queried for individual values when necessary. Represented in the `LookupTableSource` interface. + +Both interfaces can be implemented at the same time. The planner decides about their usage depending Review comment: ```suggestion A class can implement both of these interfaces at the same time. The planner decides about their usage depending ``` ########## File path: docs/dev/table/sourceSinks.md ########## @@ -0,0 +1,740 @@ +--- +title: "User-defined Sources & Sinks" +nav-parent_id: tableapi +nav-pos: 130 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +_Dynamic tables_ are the core concept of Flink's Table & SQL API for processing both bounded and unbounded +data in a unified fashion. + +Because dynamic tables are only a logical concept, Flink does not own the data itself. Instead, the content +of a dynamic table is stored in external systems (such as databases, key-value stores, message queues) or files. + +_Dynamic sources_ and _dynamic sinks_ can be used to read and write data from and to an external system. In +the documentation, sources and sinks are often summarized under the term _connector_. + +Flink provides pre-defined connectors for Kafka, Hive, and different file systems. See the [connector section]({% link dev/table/connectors/index.md %}) +for more information about built-in table sources and sinks. + +This page focuses on how to develop a custom, user-defined connector. + +<span class="label label-danger">Attention</span> New table source and table sink interfaces have been +introduced in Flink 1.11 as part of [FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces). +Also the factory interfaces have been reworked. FLIP-95 is not fully implemented yet. Many ability interfaces +are not supported yet (e.g. for filter or partition push down). If necessary, please also have a look +at the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md %}). Those interfaces +are still supported for backwards compatibility. + +* This will be replaced by the TOC +{:toc} + +Overview +-------- + +In many cases, implementers don't need to create a new connector from scratch but would like to slightly +modify existing connectors or hook into the existing stack. In other cases, implementers would like to +create specialed connectors. + +This section helps for both kinds of use cases. It explains the general architecture of table connectors +from pure declaration in the API to runtime code that will be executed on the cluster. + +The filled arrows show how objects are transformed to other objects from one stage to the next stage during +the translation process. + +<div style="text-align: center"> + <img width="90%" src="{% link fig/table_connectors.svg %}" alt="Translation of table connectors" /> +</div> + +### Metadata + +Both Table API and SQL are declarative APIs. This includes the declaration of tables. Thus, executing +a `CREATE TABLE` statement results in updated metadata in the target catalog. + +For most catalog implementations, physical data in the external system is not modified for such an +operation. Connector-specific dependencies must not be present in the classpath yet. The options declared +in the `WITH` clause are neither validated nor otherwise interpreted. + +The metadata for dynamic tables (created via DDL or provided by the catalog) is represented as instances +of `CatalogTable`. A table name will be resolved into a `CatalogTable` internally when necessary. + +### Planning + +When it comes to planning and optimization of the table program, a `CatalogTable` needs to be resolved +into a `DynamicTableSource` (for reading in a `SELECT` queries) and `DynamicTableSink` (for writing in +a `INSERT INTO` statements). + +`DynamicTableSourceFactory` and `DynamicTableSinkFactory` provide connector-specific logic for translating +the metadata of a `CatalogTable` into instances of `DynamicTableSource` and `DynamicTableSink`. In most +of the cases, a factory's purpose is to validate options (such as `'port' = '5022'` in the example), +configure encoding/decoding formats (if required), and create a parameterized instance of the table +connector. + +By default, instances of `DynamicTableSourceFactory` and `DynamicTableSinkFactory` are discovered using +Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html). The +`connector` option (such as `'connector' = 'custom'` in the example) must correspond to a valid factory +identifier. + +Although it might not be apparent in the class naming, `DynamicTableSource` and `DynamicTableSink` +can also be seen as stateful factories that eventually produce concrete runtime implementation for reading/writing +the actual data. + +The planner uses the source and sink instances to perform connector-specific bidirectional comunication +until an optimal logical plan could be found. Depending on the optionally declared ability interfaces (e.g. +`SupportsProjectionPushDown` or `SupportsOverwrite`), the planner might apply changes to an instance and +thus mutate the produced runtime implementation. + +### Runtime + +Once the logical planning is complete, the planner will obtain the _runtime implementation_ from the table +connector. Runtime logic is implemented in Flink's core connector interfaces such as `InputFormat` or `SourceFunction`. + +Flink offers different core interfaces that are currently being unified. Therefore, the different runtime +interfaces are grouped by another level of abstraction as subclasses of `ScanRuntimeProvider`, `LookupRuntimeProvider`, +and `SinkRuntimeProvider`. + +For example, both `OutputFormatProvider` (providing `org.apache.flink.api.common.io.OutputFormat`) and `SinkFunctionProvider` (providing `org.apache.flink.streaming.api.functions.sink.SinkFunction`) are concrete instances of `SinkRuntimeProvider`. + +{% top %} + +Extension Points +---------------- + +This section explains the available interfaces for extending Flink's table connectors. + +### Dynamic Table Factories + +Dynamic table factories configuring a dynamic table connector for an external storage system from catalog +and session information. + +`org.apache.flink.table.factories.DynamicTableSourceFactory` can be implemented to construct a `DynamicTableSource`. + +`org.apache.flink.table.factories.DynamicTableSinkFactory` can be implemented to construct a `DynamicTableSink`. + +By default, the factory is discovered using the value of the `connector` option as the factory identifier +and Java's Service Provider Interface. + +In JAR files, references to new implementations can be added to the service file: + +`META-INF/services/org.apache.flink.table.factories.Factory` + +The framework will check for a single matching factory that is uniquely identified by factory identifier +and requested base class (e.g. `DynamicTableSourceFactory`). + +The factory discovery process can be bypassed by the catalog implementation if necessary. For this, a +catalog needs to return an instance that implements the requested base class in `org.apache.flink.table.catalog.Catalog#getFactory`. + +### Dynamic Table Source + +By definition, a dynamic table can change over time. + +When reading a dynamic table, the content can either be considered as: +- A changelog (finite or infinite) for which all changes are consumed continuously until the changelog + is exhausted. Represented in the `ScanTableSource` interface. +- A continuously changing or very large external table whose content is usually never read entirely + but queried for individual values when necessary. Represented in the `LookupTableSource` interface. + +Both interfaces can be implemented at the same time. The planner decides about their usage depending +on the specified query. + +#### Scan Table Source + +A `ScanTableSource` scans all rows from an external storage system during runtime. + +The scanned rows don't have to contain only insertions but can also contain updates and deletions. Thus, +the table source can be used to read a (finite or infinite) changelog. The returned _changelog mode_ indicates +the set of changes that the planner can expect during runtime. + +For regular batch scenarios, the source can emit a bounded stream of insert-only rows. + +For regular streaming scenarios, the source can emit an unbounded stream of insert-only rows. + +For change data capture (CDC) scenarios, the source can emit bounded or unbounded streams with insert, +update, and delete rows. + +A table source can implement further abilitiy interfaces such as `SupportsProjectionPushDown` that might +mutate an instance during planning. All abilities are listed in the `org.apache.flink.table.connector.source.abilities` +package and in the documentation of `org.apache.flink.table.connector.source.ScanTableSource`. + +The runtime implementation of a `ScanTableSource` must produce internal data structures. Thus, records +must be emitted as `org.apache.flink.table.data.RowData`. The framework provides runtime converters such +that a source can still work on common data structures and perform a conversion at the end. + +#### Lookup Table Source + +A `LookupTableSource` looks up rows of an external storage system by one or more keys during runtime. + +Compared to `ScanTableSource`, the source must not read the entire table and can lazily fetch individual Review comment: ```suggestion Compared to `ScanTableSource`, the source does not have to read the entire table and can lazily fetch individual ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
