This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9919dc8 [FLINK-20191][docs] Add documentation for FLIP-95 ability
interfaces
9919dc8 is described below
commit 9919dc8baa225de76f30a3d9e15250c06eee684f
Author: Shengkai <[email protected]>
AuthorDate: Wed Nov 18 14:50:42 2020 +0800
[FLINK-20191][docs] Add documentation for FLIP-95 ability interfaces
This closes #14111.
---
docs/dev/table/connectors/kafka.md | 16 +++-
docs/dev/table/connectors/kafka.zh.md | 16 +++-
docs/dev/table/connectors/upsert-kafka.md | 12 +++
docs/dev/table/connectors/upsert-kafka.zh.md | 9 +++
docs/dev/table/sourceSinks.md | 92 ++++++++++++++++++++--
docs/dev/table/sourceSinks.zh.md | 92 ++++++++++++++++++++--
.../abilities/SupportsWatermarkPushDown.java | 2 +-
7 files changed, 218 insertions(+), 21 deletions(-)
diff --git a/docs/dev/table/connectors/kafka.md
b/docs/dev/table/connectors/kafka.md
index c5e4e6a..85e1f5b 100644
--- a/docs/dev/table/connectors/kafka.md
+++ b/docs/dev/table/connectors/kafka.md
@@ -34,8 +34,8 @@ The Kafka connector allows for reading data from and writing
data into Kafka top
Dependencies
------------
-{% assign connector = site.data.sql-connectors['kafka'] %}
-{% include sql-connector-download-table.html
+{% assign connector = site.data.sql-connectors['kafka'] %}
+{% include sql-connector-download-table.html
connector=connector
%}
@@ -546,6 +546,18 @@ Besides enabling Flink's checkpointing, you can also
choose three different mode
Please refer to [Kafka documentation]({% link dev/connectors/kafka.md
%}#kafka-producers-and-fault-tolerance) for more caveats about delivery
guarantees.
+### Source Per-Partition Watermarks
+
+Flink supports to emit per-partition watermarks for Kafka. Watermarks are
generated inside the Kafka
+consumer. The per-partition watermarks are merged in the same way as
watermarks are merged during streaming
+shuffles. The output watermark of the source is determined by the minimum
watermark among the partitions
+it reads. If some partitions in the topics are idle, the watermark generator
will not advance. You can
+alleviate this problem by setting the [`'table.exec.source.idle-timeout'`]({%
link dev/table/config.md %}#table-exec-source-idle-timeout)
+option in the table configuration.
+
+Please refer to [Kafka watermark strategies]({% link
dev/event_timestamps_watermarks.md
%}#watermark-strategies-and-the-kafka-connector)
+for more details.
+
Data Type Mapping
----------------
diff --git a/docs/dev/table/connectors/kafka.zh.md
b/docs/dev/table/connectors/kafka.zh.md
index d6a3ed7..df6192c 100644
--- a/docs/dev/table/connectors/kafka.zh.md
+++ b/docs/dev/table/connectors/kafka.zh.md
@@ -34,8 +34,8 @@ The Kafka connector allows for reading data from and writing
data into Kafka top
Dependencies
------------
-{% assign connector = site.data.sql-connectors['kafka'] %}
-{% include sql-connector-download-table.html
+{% assign connector = site.data.sql-connectors['kafka'] %}
+{% include sql-connector-download-table.html
connector=connector
%}
@@ -547,6 +547,18 @@ Besides enabling Flink's checkpointing, you can also
choose three different mode
Please refer to [Kafka documentation]({% link dev/connectors/kafka.zh.md
%}#kafka-producers-and-fault-tolerance) for more caveats about delivery
guarantees.
+### Source Per-Partition Watermarks
+
+Flink supports to emit per-partition watermarks for Kafka. Watermarks are
generated inside the Kafka
+consumer. The per-partition watermarks are merged in the same way as
watermarks are merged during streaming
+shuffles. The output watermark of the source is determined by the minimum
watermark among the partitions
+it reads. If some partitions in the topics are idle, the watermark generator
will not advance. You can
+alleviate this problem by setting the [`'table.exec.source.idle-timeout'`]({%
link dev/table/config.zh.md %}#table-exec-source-idle-timeout)
+option in the table configuration.
+
+Please refer to [Kafka watermark strategies]({% link
dev/event_timestamps_watermarks.zh.md
%}#watermark-strategies-and-the-kafka-connector)
+for more details.
+
Data Type Mapping
----------------
diff --git a/docs/dev/table/connectors/upsert-kafka.md
b/docs/dev/table/connectors/upsert-kafka.md
index e30302d..2bc39bc 100644
--- a/docs/dev/table/connectors/upsert-kafka.md
+++ b/docs/dev/table/connectors/upsert-kafka.md
@@ -249,6 +249,18 @@ connector is working in the upsert mode, the last record
on the same key will ta
reading back as a source. Therefore, the upsert-kafka connector achieves
idempotent writes just like
the [HBase sink]({% link dev/table/connectors/hbase.md %}).
+### Source Per-Partition Watermarks
+
+Flink supports to emit per-partition watermarks for Upsert Kafka. Watermarks
are generated inside the Kafka
+consumer. The per-partition watermarks are merged in the same way as
watermarks are merged during streaming
+shuffles. The output watermark of the source is determined by the minimum
watermark among the partitions
+it reads. If some partitions in the topics are idle, the watermark generator
will not advance. You can
+alleviate this problem by setting the [`'table.exec.source.idle-timeout'`]({%
link dev/table/config.md %}#table-exec-source-idle-timeout)
+option in the table configuration.
+
+Please refer to [Kafka watermark strategies]({% link
dev/event_timestamps_watermarks.md
%}#watermark-strategies-and-the-kafka-connector)
+for more details.
+
Data Type Mapping
----------------
diff --git a/docs/dev/table/connectors/upsert-kafka.zh.md
b/docs/dev/table/connectors/upsert-kafka.zh.md
index bb8cbe4..f2f31e3 100644
--- a/docs/dev/table/connectors/upsert-kafka.zh.md
+++ b/docs/dev/table/connectors/upsert-kafka.zh.md
@@ -229,6 +229,15 @@ Upsert Kafka 始终以 upsert 方式工作,并且需要在 DDL 中定义主键
这意味着,Flink 可以将具有相同 key 的重复记录写入 Kafka topic。但由于该连接器以 upsert 的模式工作,该连接器作为 source
读入时,可以确保具有相同主键值下仅最后一条消息会生效。因此,upsert-kafka 连接器可以像 [HBase sink]({% link
dev/table/connectors/hbase.zh.md %}) 一样实现幂等写入。
+### 为每个分区生成相应的 watermark
+
+Flink 支持根据 Upsert Kafka 的 每个分区的数据特性发送相应的 watermark。当使用这个特性的时候,watermark 是在
Kafka consumer 内部生成的。 合并每个分区
+生成的 watermark 的方式和 stream shuffle 的方式是一致的。 数据源产生的 watermark 是取决于该 consumer
负责的所有分区中当前最小的 watermark。如果该
+consumer 负责的部分分区是 idle 的,那么整体的 watermark 并不会前进。在这种情况下,可以通过设置合适的
[table.exec.source.idle-timeout]({% link dev/table/config.zh.md
%}#table-exec-source-idle-timeout)
+来缓解这个问题。
+
+如想获得更多细节,请查阅 [Kafka watermark strategies]({% link
dev/event_timestamps_watermarks.zh.md
%}#watermark-strategies-and-the-kafka-connector).
+
数据类型映射
----------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 2e44094..38ad0a9 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -38,10 +38,8 @@ This page focuses on how to develop a custom, user-defined
connector.
<span class="label label-danger">Attention</span> New table source and table
sink interfaces have been
introduced in Flink 1.11 as part of
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
-Also the factory interfaces have been reworked. FLIP-95 is not fully
implemented yet. Many ability interfaces
-are not supported yet (e.g. for filter or partition push down). If necessary,
please also have a look
-at the [old table sources and sinks page]({% link
dev/table/legacySourceSinks.md %}). Those interfaces
-are still supported for backwards compatibility.
+Also the factory interfaces have been reworked. If necessary, take a look at
the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md
%}).
+Those interfaces are still supported for backwards compatibility.
* This will be replaced by the TOC
{:toc}
@@ -171,8 +169,8 @@ For change data capture (CDC) scenarios, the source can
emit bounded or unbounde
update, and delete rows.
A table source can implement further ability interfaces such as
`SupportsProjectionPushDown` that might
-mutate an instance during planning. All abilities are listed in the
`org.apache.flink.table.connector.source.abilities`
-package and in the documentation of
`org.apache.flink.table.connector.source.ScanTableSource`.
+mutate an instance during planning. All abilities can be found in the
`org.apache.flink.table.connector.source.abilities`
+package and are listed in the [source abilities table](#source-abilities).
The runtime implementation of a `ScanTableSource` must produce internal data
structures. Thus, records
must be emitted as `org.apache.flink.table.data.RowData`. The framework
provides runtime converters such
@@ -193,6 +191,55 @@ for more information.
The runtime implementation of a `LookupTableSource` is a `TableFunction` or
`AsyncTableFunction`. The function
will be called with values for the given lookup keys during runtime.
+#### Source Abilities
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Interface</th>
+ <th class="text-center" style="width: 75%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsFilterPushDown.java'>SupportsFilterPushDown</a></td>
+ <td>Enables to push down the filter into the
<code>DynamicTableSource</code>. For efficiency, a source can
+ push filters further down in order to be close to the actual data
generation.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLimitPushDown.java'>SupportsLimitPushDown</a></td>
+ <td>Enables to push down a limit (the expected maximum number of
produced records) into a <code>DynamicTableSource</code>.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsPartitionPushDown.java'>SupportsPartitionPushDown</a></td>
+ <td>Enables to pass available partitions to the planner and push down
partitions into a <code>DynamicTableSource</code>.
+ During the runtime, the source will only read data from the passed
partition list for efficiency.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java'>SupportsProjectionPushDown</a>
</td>
+ <td>Enables to push down a (possibly nested) projection into a
<code>DynamicTableSource</code>. For efficiency,
+ a source can push a projection further down in order to be close to
the actual data generation. If the source
+ also implements <code>SupportsReadingMetadata</code>, the source will
also read the required metadata only.
+ </td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java'>SupportsReadingMetadata</a></td>
+ <td>Enables to read metadata columns from a
<code>DynamicTableSource</code>. The source
+ is responsible to add the required metadata at the end of the produced
rows. This includes
+ potentially forwarding metadata column from contained formats.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java'>SupportsWatermarkPushDown</a></td>
+ <td>Enables to push down a watermark strategy into a
<code>DynamicTableSource</code>. The watermark
+ strategy is a builder/factory for timestamp extraction and watermark
generation. During the runtime, the
+ watermark generator is located inside the source and is able to
generate per-partition watermarks.</td>
+ </tr>
+ </tbody>
+</table>
+
+<span class="label label-danger">Attention</span> The interfaces above are
currently only available for
+`ScanTableSource`, not for `LookupTableSource`.
+
### Dynamic Table Sink
By definition, a dynamic table can change over time.
@@ -209,13 +256,42 @@ For change data capture (CDC) scenarios, the sink can
write out bounded or unbou
update, and delete rows.
A table sink can implement further ability interfaces such as
`SupportsOverwrite` that might mutate an
-instance during planning. All abilities are listed in the
`org.apache.flink.table.connector.sink.abilities`
-package and in the documentation of
`org.apache.flink.table.connector.sink.DynamicTableSink`.
+instance during planning. All abilities can be found in the
`org.apache.flink.table.connector.sink.abilities`
+package and are listed in the [sink abilities table](#sink-abilities).
The runtime implementation of a `DynamicTableSink` must consume internal data
structures. Thus, records
must be accepted as `org.apache.flink.table.data.RowData`. The framework
provides runtime converters such
that a sink can still work on common data structures and perform a conversion
at the beginning.
+#### Sink Abilities
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Interface</th>
+ <th class="text-center" style="width: 75%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsOverwrite.java'>SupportsOverwrite</a></td>
+ <td>Enables to overwrite existing data in a
<code>DynamicTableSink</code>. By default, if
+ this interface is not implemented, existing tables or partitions
cannot be overwritten using
+ e.g. the SQL <code>INSERT OVERWRITE</code> clause.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java'>SupportsPartitioning</a></td>
+ <td>Enables to write partitioned data in a
<code>DynamicTableSink</code>.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java'>SupportsWritingMetadata</a></td>
+ <td>Enables to write metadata columns into a
<code>DynamicTableSource</code>. A table sink is
+ responsible for accepting requested metadata columns at the end of
consumed rows and persist
+ them. This includes potentially forwarding metadata columns to
contained formats.</td>
+ </tr>
+ </tbody>
+</table>
+
### Encoding / Decoding Formats
Some table connectors accept different formats that encode and decode keys
and/or values.
diff --git a/docs/dev/table/sourceSinks.zh.md b/docs/dev/table/sourceSinks.zh.md
index 368a41c..a2dc2b4 100644
--- a/docs/dev/table/sourceSinks.zh.md
+++ b/docs/dev/table/sourceSinks.zh.md
@@ -38,10 +38,8 @@ This page focuses on how to develop a custom, user-defined
connector.
<span class="label label-danger">Attention</span> New table source and table
sink interfaces have been
introduced in Flink 1.11 as part of
[FLIP-95](https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces).
-Also the factory interfaces have been reworked. FLIP-95 is not fully
implemented yet. Many ability interfaces
-are not supported yet (e.g. for filter or partition push down). If necessary,
please also have a look
-at the [old table sources and sinks page]({% link
dev/table/legacySourceSinks.zh.md %}). Those interfaces
-are still supported for backwards compatibility.
+Also the factory interfaces have been reworked. If necessary, take a look at
the [old table sources and sinks page]({% link dev/table/legacySourceSinks.md
%}).
+Those interfaces are still supported for backwards compatibility.
* This will be replaced by the TOC
{:toc}
@@ -171,8 +169,8 @@ For change data capture (CDC) scenarios, the source can
emit bounded or unbounde
update, and delete rows.
A table source can implement further ability interfaces such as
`SupportsProjectionPushDown` that might
-mutate an instance during planning. All abilities are listed in the
`org.apache.flink.table.connector.source.abilities`
-package and in the documentation of
`org.apache.flink.table.connector.source.ScanTableSource`.
+mutate an instance during planning. All abilities can be found in the
`org.apache.flink.table.connector.source.abilities`
+package and are listed in the [source abilities table](#source-abilities).
The runtime implementation of a `ScanTableSource` must produce internal data
structures. Thus, records
must be emitted as `org.apache.flink.table.data.RowData`. The framework
provides runtime converters such
@@ -193,6 +191,55 @@ for more information.
The runtime implementation of a `LookupTableSource` is a `TableFunction` or
`AsyncTableFunction`. The function
will be called with values for the given lookup keys during runtime.
+#### Source Abilities
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Interface</th>
+ <th class="text-center" style="width: 75%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsFilterPushDown.java'>SupportsFilterPushDown</a></td>
+ <td>Enables to push down the filter into the
<code>DynamicTableSource</code>. For efficiency, a source can
+ push filters further down in order to be close to the actual data
generation.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLimitPushDown.java'>SupportsLimitPushDown</a></td>
+ <td>Enables to push down a limit (the expected maximum number of
produced records) into a <code>DynamicTableSource</code>.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsPartitionPushDown.java'>SupportsPartitionPushDown</a></td>
+ <td>Enables to pass available partitions to the planner and push down
partitions into a <code>DynamicTableSource</code>.
+ During the runtime, the source will only read data from the passed
partition list for efficiency.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java'>SupportsProjectionPushDown</a>
</td>
+ <td>Enables to push down a (possibly nested) projection into a
<code>DynamicTableSource</code>. For efficiency,
+ a source can push a projection further down in order to be close to
the actual data generation. If the source
+ also implements <code>SupportsReadingMetadata</code>, the source will
also read the required metadata only.
+ </td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java'>SupportsReadingMetadata</a></td>
+ <td>Enables to read metadata columns from a
<code>DynamicTableSource</code>. The source
+ is responsible to add the required metadata at the end of the produced
rows. This includes
+ potentially forwarding metadata column from contained formats.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java'>SupportsWatermarkPushDown</a></td>
+ <td>Enables to push down a watermark strategy into a
<code>DynamicTableSource</code>. The watermark
+ strategy is a builder/factory for timestamp extraction and watermark
generation. During the runtime, the
+ watermark generator is located inside the source and is able to
generate per-partition watermarks.</td>
+ </tr>
+ </tbody>
+</table>
+
+<span class="label label-danger">Attention</span> The interfaces above are
currently only available for
+`ScanTableSource`, not for `LookupTableSource`.
+
### Dynamic Table Sink
By definition, a dynamic table can change over time.
@@ -209,13 +256,42 @@ For change data capture (CDC) scenarios, the sink can
write out bounded or unbou
update, and delete rows.
A table sink can implement further ability interfaces such as
`SupportsOverwrite` that might mutate an
-instance during planning. All abilities are listed in the
`org.apache.flink.table.connector.sink.abilities`
-package and in the documentation of
`org.apache.flink.table.connector.sink.DynamicTableSink`.
+instance during planning. All abilities can be found in the
`org.apache.flink.table.connector.sink.abilities`
+package and are listed in the [sink abilities table](#sink-abilities).
The runtime implementation of a `DynamicTableSink` must consume internal data
structures. Thus, records
must be accepted as `org.apache.flink.table.data.RowData`. The framework
provides runtime converters such
that a sink can still work on common data structures and perform a conversion
at the beginning.
+#### Sink Abilities
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Interface</th>
+ <th class="text-center" style="width: 75%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsOverwrite.java'>SupportsOverwrite</a></td>
+ <td>Enables to overwrite existing data in a
<code>DynamicTableSink</code>. By default, if
+ this interface is not implemented, existing tables or partitions
cannot be overwritten using
+ e.g. the SQL <code>INSERT OVERWRITE</code> clause.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsPartitioning.java'>SupportsPartitioning</a></td>
+ <td>Enables to write partitioned data in a
<code>DynamicTableSink</code>.</td>
+ </tr>
+ <tr>
+ <td><a
href='https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/abilities/SupportsWritingMetadata.java'>SupportsWritingMetadata</a></td>
+ <td>Enables to write metadata columns into a
<code>DynamicTableSource</code>. A table sink is
+ responsible for accepting requested metadata columns at the end of
consumed rows and persist
+ them. This includes potentially forwarding metadata columns to
contained formats.</td>
+ </tr>
+ </tbody>
+</table>
+
### Encoding / Decoding Formats
Some table connectors accept different formats that encode and decode keys
and/or values.
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java
index c54c740..4192db9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsWatermarkPushDown.java
@@ -28,7 +28,7 @@ import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
/**
- * Enables to push down watermarks into a {@link ScanTableSource}.
+ * Enables to push down a watermark strategy into a {@link ScanTableSource}.
*
* <p>The concept of watermarks defines when time operations based on an event
time attribute will be
* triggered. A watermark tells operators that no elements with a timestamp
older or equal to the watermark