This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7ce0f39 [FLINK-19859][docs] Add document for the upsert-kafka
connector
7ce0f39 is described below
commit 7ce0f39058a03bb95bb00e444c75494dcb8d3b2c
Author: Shengkai <[email protected]>
AuthorDate: Tue Nov 17 11:52:43 2020 +0800
[FLINK-19859][docs] Add document for the upsert-kafka connector
This closes #14017
---
docs/_data/sql-connectors.yml | 8 +
docs/dev/table/connectors/blackhole.md | 2 +-
docs/dev/table/connectors/blackhole.zh.md | 2 +-
docs/dev/table/connectors/datagen.md | 18 +--
docs/dev/table/connectors/datagen.zh.md | 2 +-
docs/dev/table/connectors/elasticsearch.md | 2 +-
docs/dev/table/connectors/elasticsearch.zh.md | 2 +-
docs/dev/table/connectors/filesystem.md | 14 +-
docs/dev/table/connectors/filesystem.zh.md | 14 +-
docs/dev/table/connectors/hbase.md | 2 +-
docs/dev/table/connectors/hbase.zh.md | 2 +-
docs/dev/table/connectors/jdbc.md | 16 +-
docs/dev/table/connectors/jdbc.zh.md | 2 +-
docs/dev/table/connectors/kafka.md | 2 +-
docs/dev/table/connectors/kafka.zh.md | 2 +-
docs/dev/table/connectors/kinesis.md | 32 ++--
docs/dev/table/connectors/kinesis.zh.md | 32 ++--
docs/dev/table/connectors/print.md | 2 +-
docs/dev/table/connectors/print.zh.md | 2 +-
docs/dev/table/connectors/upsert-kafka.md | 212 ++++++++++++++++++++++++++
docs/dev/table/connectors/upsert-kafka.zh.md | 212 ++++++++++++++++++++++++++
21 files changed, 507 insertions(+), 75 deletions(-)
diff --git a/docs/_data/sql-connectors.yml b/docs/_data/sql-connectors.yml
index 2c14a00..181eb0c 100644
--- a/docs/_data/sql-connectors.yml
+++ b/docs/_data/sql-connectors.yml
@@ -117,6 +117,14 @@ kafka:
maven: flink-connector-kafka{{site.scala_version_suffix}}
sql-url:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar
+upsert-kafka:
+ name: Upsert Kafka
+ category: connector
+ versions:
+ - version: universal
+ maven: flink-connector-kafka{{site.scala_version_suffix}}
+ sql-url:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar
+
kinesis:
name: Kinesis
category: connector
diff --git a/docs/dev/table/connectors/blackhole.md
b/docs/dev/table/connectors/blackhole.md
index b54f906..2692bdf 100644
--- a/docs/dev/table/connectors/blackhole.md
+++ b/docs/dev/table/connectors/blackhole.md
@@ -2,7 +2,7 @@
title: "BlackHole SQL Connector"
nav-title: BlackHole
nav-parent_id: sql-connectors
-nav-pos: 12
+nav-pos: 14
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/blackhole.zh.md
b/docs/dev/table/connectors/blackhole.zh.md
index 0727695..53e3d9d 100644
--- a/docs/dev/table/connectors/blackhole.zh.md
+++ b/docs/dev/table/connectors/blackhole.zh.md
@@ -2,7 +2,7 @@
title: "BlackHole SQL 连接器"
nav-title: BlackHole
nav-parent_id: sql-connectors
-nav-pos: 12
+nav-pos: 14
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/datagen.md
b/docs/dev/table/connectors/datagen.md
index 6de7169..dcf4465 100644
--- a/docs/dev/table/connectors/datagen.md
+++ b/docs/dev/table/connectors/datagen.md
@@ -2,7 +2,7 @@
title: "DataGen SQL Connector"
nav-title: DataGen
nav-parent_id: sql-connectors
-nav-pos: 10
+nav-pos: 12
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -58,7 +58,7 @@ CREATE TABLE Orders (
)
{% endhighlight %}
-Often, the data generator connector is used in conjuction with the ``LIKE``
clause to mock out physical tables.
+Often, the data generator connector is used in conjuction with the ``LIKE``
clause to mock out physical tables.
{% highlight sql %}
CREATE TABLE Orders (
@@ -69,8 +69,8 @@ CREATE TABLE Orders (
) WITH (...)
-- create a bounded mock table
-CREATE TEMPORARY TABLE GenOrders
-WITH (
+CREATE TEMPORARY TABLE GenOrders
+WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
@@ -118,17 +118,17 @@ Types
<td>TINYINT</td>
<td>random / sequence</td>
<td></td>
- </tr>
+ </tr>
<tr>
<td>SMALLINT</td>
<td>random / sequence</td>
<td></td>
- </tr>
+ </tr>
<tr>
<td>INT</td>
<td>random / sequence</td>
<td></td>
- </tr>
+ </tr>
<tr>
<td>BIGINT</td>
<td>random / sequence</td>
@@ -158,12 +158,12 @@ Types
<td>TIMESTAMP</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local
machine.</td>
- </tr>
+ </tr>
<tr>
<td>TIMESTAMP WITH LOCAL TIMEZONE</td>
<td>random</td>
<td>Always resolves to the current timestamp of the local
machine.</td>
- </tr>
+ </tr>
<tr>
<td>INTERVAL YEAR TO MONTH</td>
<td>random</td>
diff --git a/docs/dev/table/connectors/datagen.zh.md
b/docs/dev/table/connectors/datagen.zh.md
index 92b5972..330079b 100644
--- a/docs/dev/table/connectors/datagen.zh.md
+++ b/docs/dev/table/connectors/datagen.zh.md
@@ -2,7 +2,7 @@
title: "DataGen SQL 连接器"
nav-title: DataGen
nav-parent_id: sql-connectors
-nav-pos: 10
+nav-pos: 12
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/elasticsearch.md
b/docs/dev/table/connectors/elasticsearch.md
index fc90bb7..c7cabaf 100644
--- a/docs/dev/table/connectors/elasticsearch.md
+++ b/docs/dev/table/connectors/elasticsearch.md
@@ -2,7 +2,7 @@
title: "Elasticsearch SQL Connector"
nav-title: Elasticsearch
nav-parent_id: sql-connectors
-nav-pos: 4
+nav-pos: 6
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/elasticsearch.zh.md
b/docs/dev/table/connectors/elasticsearch.zh.md
index d2e67b8..5ba437c 100644
--- a/docs/dev/table/connectors/elasticsearch.zh.md
+++ b/docs/dev/table/connectors/elasticsearch.zh.md
@@ -2,7 +2,7 @@
title: "Elasticsearch SQL Connector"
nav-title: Elasticsearch
nav-parent_id: sql-connectors
-nav-pos: 4
+nav-pos: 6
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/filesystem.md
b/docs/dev/table/connectors/filesystem.md
index 63f1388..015113c 100644
--- a/docs/dev/table/connectors/filesystem.md
+++ b/docs/dev/table/connectors/filesystem.md
@@ -2,7 +2,7 @@
title: "FileSystem SQL Connector"
nav-title: FileSystem
nav-parent_id: sql-connectors
-nav-pos: 5
+nav-pos: 7
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -51,7 +51,7 @@ CREATE TABLE MyUserTable (
-- section for more details
'partition.default-name' = '...', -- optional: default partition name in
case the dynamic partition
-- column value is null/empty string
-
+
-- optional: the option to enable shuffle data by dynamic partition fields
in sink phase, this can greatly
-- reduce the number of file for filesystem sink but may lead data skew, the
default value is false.
'sink.shuffle-by-partition.enable' = '...',
@@ -65,7 +65,7 @@ CREATE TABLE MyUserTable (
<span class="label label-danger">Attention</span> File system sources for
streaming is still under development. In the future, the community will add
support for common streaming use cases, i.e., partition and directory
monitoring.
-<span class="label label-danger">Attention</span> The behaviour of file system
connector is much different from `previous legacy filesystem connector`:
+<span class="label label-danger">Attention</span> The behaviour of file system
connector is much different from `previous legacy filesystem connector`:
the path parameter is specified for a directory not for a file and you can't
get a human-readable file in the path that you declare.
## Partition Files
@@ -186,7 +186,7 @@ When running file compaction in production, please be aware
that:
### Partition Commit
-After writing a partition, it is often necessary to notify downstream
applications. For example, add the partition to a Hive metastore or writing a
`_SUCCESS` file in the directory. The file system sink contains a partition
commit feature that allows configuring custom policies. Commit actions are
based on a combination of `triggers` and `policies`.
+After writing a partition, it is often necessary to notify downstream
applications. For example, add the partition to a Hive metastore or writing a
`_SUCCESS` file in the directory. The file system sink contains a partition
commit feature that allows configuring custom policies. Commit actions are
based on a combination of `triggers` and `policies`.
- Trigger: The timing of the commit of the partition can be determined by the
watermark with the time extracted from the partition, or by processing time.
- Policy: How to commit a partition, built-in policies support for the commit
of success files and metastore, you can also implement your own policies, such
as triggering hive's analysis to generate statistics, or merging small files,
etc.
@@ -284,7 +284,7 @@ Time extractors define extracting time from partition
values.
</tbody>
</table>
-The default extractor is based on a timestamp pattern composed of your
partition fields. You can also specify an implementation for fully custom
partition extraction based on the `PartitionTimeExtractor` interface.
+The default extractor is based on a timestamp pattern composed of your
partition fields. You can also specify an implementation for fully custom
partition extraction based on the `PartitionTimeExtractor` interface.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -305,7 +305,7 @@ public class HourPartTimeExtractor implements
PartitionTimeExtractor {
#### Partition Commit Policy
-The partition commit policy defines what action is taken when partitions are
committed.
+The partition commit policy defines what action is taken when partitions are
committed.
- The first is metastore, only hive table supports metastore policy, file
system manages partitions through directory structure.
- The second is the success file, which will write an empty file in the
directory corresponding to the partition.
@@ -373,7 +373,7 @@ public class AnalysisCommitPolicy implements
PartitionCommitPolicy {
## Full Example
-The below shows how the file system connector can be used to write a streaming
query to write data from Kafka into a file system and runs a batch query to
read that data back out.
+The below shows how the file system connector can be used to write a streaming
query to write data from Kafka into a file system and runs a batch query to
read that data back out.
{% highlight sql %}
diff --git a/docs/dev/table/connectors/filesystem.zh.md
b/docs/dev/table/connectors/filesystem.zh.md
index 63f1388..015113c 100644
--- a/docs/dev/table/connectors/filesystem.zh.md
+++ b/docs/dev/table/connectors/filesystem.zh.md
@@ -2,7 +2,7 @@
title: "FileSystem SQL Connector"
nav-title: FileSystem
nav-parent_id: sql-connectors
-nav-pos: 5
+nav-pos: 7
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -51,7 +51,7 @@ CREATE TABLE MyUserTable (
-- section for more details
'partition.default-name' = '...', -- optional: default partition name in
case the dynamic partition
-- column value is null/empty string
-
+
-- optional: the option to enable shuffle data by dynamic partition fields
in sink phase, this can greatly
-- reduce the number of file for filesystem sink but may lead data skew, the
default value is false.
'sink.shuffle-by-partition.enable' = '...',
@@ -65,7 +65,7 @@ CREATE TABLE MyUserTable (
<span class="label label-danger">Attention</span> File system sources for
streaming is still under development. In the future, the community will add
support for common streaming use cases, i.e., partition and directory
monitoring.
-<span class="label label-danger">Attention</span> The behaviour of file system
connector is much different from `previous legacy filesystem connector`:
+<span class="label label-danger">Attention</span> The behaviour of file system
connector is much different from `previous legacy filesystem connector`:
the path parameter is specified for a directory not for a file and you can't
get a human-readable file in the path that you declare.
## Partition Files
@@ -186,7 +186,7 @@ When running file compaction in production, please be aware
that:
### Partition Commit
-After writing a partition, it is often necessary to notify downstream
applications. For example, add the partition to a Hive metastore or writing a
`_SUCCESS` file in the directory. The file system sink contains a partition
commit feature that allows configuring custom policies. Commit actions are
based on a combination of `triggers` and `policies`.
+After writing a partition, it is often necessary to notify downstream
applications. For example, add the partition to a Hive metastore or writing a
`_SUCCESS` file in the directory. The file system sink contains a partition
commit feature that allows configuring custom policies. Commit actions are
based on a combination of `triggers` and `policies`.
- Trigger: The timing of the commit of the partition can be determined by the
watermark with the time extracted from the partition, or by processing time.
- Policy: How to commit a partition, built-in policies support for the commit
of success files and metastore, you can also implement your own policies, such
as triggering hive's analysis to generate statistics, or merging small files,
etc.
@@ -284,7 +284,7 @@ Time extractors define extracting time from partition
values.
</tbody>
</table>
-The default extractor is based on a timestamp pattern composed of your
partition fields. You can also specify an implementation for fully custom
partition extraction based on the `PartitionTimeExtractor` interface.
+The default extractor is based on a timestamp pattern composed of your
partition fields. You can also specify an implementation for fully custom
partition extraction based on the `PartitionTimeExtractor` interface.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -305,7 +305,7 @@ public class HourPartTimeExtractor implements
PartitionTimeExtractor {
#### Partition Commit Policy
-The partition commit policy defines what action is taken when partitions are
committed.
+The partition commit policy defines what action is taken when partitions are
committed.
- The first is metastore, only hive table supports metastore policy, file
system manages partitions through directory structure.
- The second is the success file, which will write an empty file in the
directory corresponding to the partition.
@@ -373,7 +373,7 @@ public class AnalysisCommitPolicy implements
PartitionCommitPolicy {
## Full Example
-The below shows how the file system connector can be used to write a streaming
query to write data from Kafka into a file system and runs a batch query to
read that data back out.
+The below shows how the file system connector can be used to write a streaming
query to write data from Kafka into a file system and runs a batch query to
read that data back out.
{% highlight sql %}
diff --git a/docs/dev/table/connectors/hbase.md
b/docs/dev/table/connectors/hbase.md
index b1d44b8..ccb5d8d4 100644
--- a/docs/dev/table/connectors/hbase.md
+++ b/docs/dev/table/connectors/hbase.md
@@ -2,7 +2,7 @@
title: "HBase SQL Connector"
nav-title: HBase
nav-parent_id: sql-connectors
-nav-pos: 6
+nav-pos: 8
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/hbase.zh.md
b/docs/dev/table/connectors/hbase.zh.md
index 318c682..02de4f9 100644
--- a/docs/dev/table/connectors/hbase.zh.md
+++ b/docs/dev/table/connectors/hbase.zh.md
@@ -2,7 +2,7 @@
title: "HBase SQL 连接器"
nav-title: HBase
nav-parent_id: sql-connectors
-nav-pos: 6
+nav-pos: 8
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/jdbc.md
b/docs/dev/table/connectors/jdbc.md
index 089aec4..52a65c2 100644
--- a/docs/dev/table/connectors/jdbc.md
+++ b/docs/dev/table/connectors/jdbc.md
@@ -2,7 +2,7 @@
title: "JDBC SQL Connector"
nav-title: JDBC
nav-parent_id: sql-connectors
-nav-pos: 3
+nav-pos: 5
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -147,7 +147,7 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The JDBC password.</td>
- </tr>
+ </tr>
<tr>
<td><h5>scan.partition.column</h5></td>
<td>optional</td>
@@ -161,7 +161,7 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The number of partitions.</td>
- </tr>
+ </tr>
<tr>
<td><h5>scan.partition.lower-bound</h5></td>
<td>optional</td>
@@ -221,7 +221,7 @@ Connector Options
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>The max size of buffered records before flush. Can be set to zero to
disable it.</td>
- </tr>
+ </tr>
<tr>
<td><h5>sink.buffer-flush.interval</h5></td>
<td>optional</td>
@@ -235,7 +235,7 @@ Connector Options
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>The max retry times if writing records to database failed.</td>
- </tr>
+ </tr>
</tbody>
</table>
@@ -289,7 +289,7 @@ As there is no standard syntax for upsert, the following
table describes the dat
<th class="text-left">Database</th>
<th class="text-left">Upsert Grammar</th>
</tr>
- </thead>
+ </thead>
<tbody>
<tr>
<td>MySQL</td>
@@ -309,7 +309,7 @@ The `JdbcCatalog` enables users to connect Flink to
relational databases over JD
Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
{% highlight java %}
-// The supported methods by Postgres Catalog.
+// The supported methods by Postgres Catalog.
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
@@ -411,7 +411,7 @@ execution:
...
current-catalog: mypg # set the JdbcCatalog as the current catalog of the
session
current-database: mydb
-
+
catalogs:
- name: mypg
type: jdbc
diff --git a/docs/dev/table/connectors/jdbc.zh.md
b/docs/dev/table/connectors/jdbc.zh.md
index 3c9a24d..588163b 100644
--- a/docs/dev/table/connectors/jdbc.zh.md
+++ b/docs/dev/table/connectors/jdbc.zh.md
@@ -2,7 +2,7 @@
title: "JDBC SQL Connector"
nav-title: JDBC
nav-parent_id: sql-connectors
-nav-pos: 3
+nav-pos: 5
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/kafka.md
b/docs/dev/table/connectors/kafka.md
index 013f904..13992dc 100644
--- a/docs/dev/table/connectors/kafka.md
+++ b/docs/dev/table/connectors/kafka.md
@@ -175,7 +175,7 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td>String</td>
- <td>Defines the delivery semantic for the Kafka sink. Valid
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and
<code>'none'</code>. See <a href='#consistency-guarantees'>Consistency
guarantees</a> for more details. </td>
+ <td>Defines the delivery semantic for the Kafka sink. Valid
enumerationns are <code>'at-least-once'</code>, <code>'exactly-once'</code> and
<code>'none'</code>. See <a href='#consistency-guarantees'>Consistency
guarantees</a> for more details. </td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
diff --git a/docs/dev/table/connectors/kafka.zh.md
b/docs/dev/table/connectors/kafka.zh.md
index 8ae4580..87c431b 100644
--- a/docs/dev/table/connectors/kafka.zh.md
+++ b/docs/dev/table/connectors/kafka.zh.md
@@ -182,7 +182,7 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td>String</td>
- <td>Defines the delivery semantic for the Kafka sink. Valid
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and
<code>'none'</code>.
+ <td>Defines the delivery semantic for the Kafka sink. Valid
enumerationns are <code>'at-least-once'</code>, <code>'exactly-once'</code> and
<code>'none'</code>.
See <a href='#consistency-guarantees'>Consistency guarantees</a> for
more details. </td>
</tr>
<tr>
diff --git a/docs/dev/table/connectors/kinesis.md
b/docs/dev/table/connectors/kinesis.md
index 739b84b..85dc0a3 100644
--- a/docs/dev/table/connectors/kinesis.md
+++ b/docs/dev/table/connectors/kinesis.md
@@ -2,7 +2,7 @@
title: "Amazon Kinesis Data Streams SQL Connector"
nav-title: Kinesis
nav-parent_id: sql-connectors
-nav-pos: 2
+nav-pos: 4
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -640,7 +640,7 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td></td>
<td>
- Sink options for the <code>KinesisProducer</code>.
+ Sink options for the <code>KinesisProducer</code>.
Suffix names must match the <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">KinesisProducerConfiguration</a>
setters in lower-case hyphenated style (for example,
<code>sink.producer.collection-max-count</code> or
<code>sink.producer.aggregation-max-count</code>).
The transformed action keys are passed to the
<code>sink.producer.*</code> to <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-">KinesisProducerConfigurations#fromProperties</a>.
Note that some of the defaults are overwritten by
<code>KinesisConfigUtil</code>.
@@ -660,18 +660,18 @@ Make sure to [create an appropriate IAM
policy](https://docs.aws.amazon.com/stre
Depending on your deployment you would choose a different Credentials Provider
to allow access to Kinesis.
By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the deployment configuration,
this results in using the `BASIC` provider.
+If the access key ID and secret key are set in the deployment configuration,
this results in using the `BASIC` provider.
A specific
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
can be **optionally** set using the `aws.credentials.provider` setting.
Supported values are:
* `AUTO` - Use the default AWS Credentials Provider chain that searches for
credentials in the following order: `ENV_VARS`, `SYS_PROPS`,
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
-* `BASIC` - Use access key ID and secret key supplied as configuration.
+* `BASIC` - Use access key ID and secret key supplied as configuration.
* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment
variables.
* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and
`aws.secretKey`.
* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web
Identity Token.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web
Identity Token.
### Start Reading Position
@@ -702,11 +702,11 @@ You can, however, use the
`sink.partitioner-field-delimiter` option to set the d
### Enhanced Fan-Out
[Enhanced Fan-Out
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the
maximum number of concurrent consumers per Kinesis data stream.
-Without EFO, all concurrent Kinesis consumers share a single read quota per
shard.
-Using EFO, each consumer gets a distinct dedicated read quota per shard,
allowing read throughput to scale with the number of consumers.
+Without EFO, all concurrent Kinesis consumers share a single read quota per
shard.
+Using EFO, each consumer gets a distinct dedicated read quota per shard,
allowing read throughput to scale with the number of consumers.
<span class="label label-info">Note</span> Using EFO will [incur additional
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
-
+
You can enable and configure EFO with the following properties:
* `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`.
@@ -720,10 +720,10 @@ You can enable and configure EFO with the following
properties:
The describe operation has a limit of 20 [transactions per
second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
this means application startup time will increase by roughly
`parallelism/20 seconds`.
* `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer`
constructor.
- If the stream consumer already exists, it will be reused.
- This will result in registration occurring when the job is constructed,
+ If the stream consumer already exists, it will be reused.
+ This will result in registration occurring when the job is constructed,
either on the Flink Job Manager or client environment submitting the job.
- Using this strategy results in a single thread registering and retrieving
the stream consumer ARN,
+ Using this strategy results in a single thread registering and retrieving
the stream consumer ARN,
reducing startup time over `LAZY` (with large parallelism).
However, consider that the client environment will require access to the
AWS services.
* `NONE`: Stream consumer registration is not performed by
`FlinkKinesisConsumer`.
@@ -733,21 +733,21 @@ You can enable and configure EFO with the following
properties:
* `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally
registered ARN-consumers (substitute `<stream-name>` with the name of your
stream in the parameter name).
Use this if you choose to use `NONE` as a `scan.stream.efo.registration`
strategy.
-<span class="label label-info">Note</span> For a given Kinesis data stream,
each EFO consumer must have a unique name.
-However, consumer names do not have to be unique across data streams.
+<span class="label label-info">Note</span> For a given Kinesis data stream,
each EFO consumer must have a unique name.
+However, consumer names do not have to be unique across data streams.
Reusing a consumer name will result in existing subscriptions being terminated.
<span class="label label-info">Note</span> With the `LAZY` and `EAGER`
strategies, stream consumers are de-registered when the job is shutdown
gracefully.
In the event that a job terminates within executing the shutdown hooks, stream
consumers will remain active.
-In this situation the stream consumers will be gracefully reused when the
application restarts.
+In this situation the stream consumers will be gracefully reused when the
application restarts.
With the `NONE` strategy, stream consumer de-registration is not performed by
`FlinkKinesisConsumer`.
Data Type Mapping
----------------
-Kinesis stores records as Base64-encoded binary data objects, so it doesn't
have a notion of internal record structure.
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't
have a notion of internal record structure.
Instead, Kinesis records are deserialized and serialized by formats, e.g.
'avro', 'csv', or 'json'.
-To determine the data type of the messages in your Kinesis-backed tables, pick
a suitable Flink format with the `format` keyword.
+To determine the data type of the messages in your Kinesis-backed tables, pick
a suitable Flink format with the `format` keyword.
Please refer to the [Formats]({% link dev/table/connectors/formats/index.md
%}) pages for more details.
{% top %}
diff --git a/docs/dev/table/connectors/kinesis.zh.md
b/docs/dev/table/connectors/kinesis.zh.md
index 2bdad3d..ca24aab 100644
--- a/docs/dev/table/connectors/kinesis.zh.md
+++ b/docs/dev/table/connectors/kinesis.zh.md
@@ -2,7 +2,7 @@
title: "Amazon Kinesis Data Streams SQL Connector"
nav-title: Kinesis
nav-parent_id: sql-connectors
-nav-pos: 2
+nav-pos: 4
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
@@ -645,7 +645,7 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td></td>
<td>
- Sink options for the <code>KinesisProducer</code>.
+ Sink options for the <code>KinesisProducer</code>.
Suffix names must match the <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">KinesisProducerConfiguration</a>
setters in lower-case hyphenated style (for example,
<code>sink.producer.collection-max-count</code> or
<code>sink.producer.aggregation-max-count</code>).
The transformed action keys are passed to the
<code>sink.producer.*</code> to <a
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-">KinesisProducerConfigurations#fromProperties</a>.
Note that some of the defaults are overwritten by
<code>KinesisConfigUtil</code>.
@@ -665,18 +665,18 @@ Make sure to [create an appropriate IAM
policy](https://docs.aws.amazon.com/stre
Depending on your deployment you would choose a different Credentials Provider
to allow access to Kinesis.
By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the deployment configuration,
this results in using the `BASIC` provider.
+If the access key ID and secret key are set in the deployment configuration,
this results in using the `BASIC` provider.
A specific
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
can be **optionally** set using the `aws.credentials.provider` setting.
Supported values are:
* `AUTO` - Use the default AWS Credentials Provider chain that searches for
credentials in the following order: `ENV_VARS`, `SYS_PROPS`,
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
-* `BASIC` - Use access key ID and secret key supplied as configuration.
+* `BASIC` - Use access key ID and secret key supplied as configuration.
* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment
variables.
* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and
`aws.secretKey`.
* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web
Identity Token.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web
Identity Token.
### Start Reading Position
@@ -707,11 +707,11 @@ You can, however, use the
`sink.partitioner-field-delimiter` option to set the d
### Enhanced Fan-Out
[Enhanced Fan-Out
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the
maximum number of concurrent consumers per Kinesis data stream.
-Without EFO, all concurrent Kinesis consumers share a single read quota per
shard.
-Using EFO, each consumer gets a distinct dedicated read quota per shard,
allowing read throughput to scale with the number of consumers.
+Without EFO, all concurrent Kinesis consumers share a single read quota per
shard.
+Using EFO, each consumer gets a distinct dedicated read quota per shard,
allowing read throughput to scale with the number of consumers.
<span class="label label-info">Note</span> Using EFO will [incur additional
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
-
+
You can enable and configure EFO with the following properties:
* `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`.
@@ -725,10 +725,10 @@ You can enable and configure EFO with the following
properties:
The describe operation has a limit of 20 [transactions per
second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
this means application startup time will increase by roughly
`parallelism/20 seconds`.
* `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer`
constructor.
- If the stream consumer already exists, it will be reused.
- This will result in registration occurring when the job is constructed,
+ If the stream consumer already exists, it will be reused.
+ This will result in registration occurring when the job is constructed,
either on the Flink Job Manager or client environment submitting the job.
- Using this strategy results in a single thread registering and retrieving
the stream consumer ARN,
+ Using this strategy results in a single thread registering and retrieving
the stream consumer ARN,
reducing startup time over `LAZY` (with large parallelism).
However, consider that the client environment will require access to the
AWS services.
* `NONE`: Stream consumer registration is not performed by
`FlinkKinesisConsumer`.
@@ -738,21 +738,21 @@ You can enable and configure EFO with the following
properties:
* `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally
registered ARN-consumers (substitute `<stream-name>` with the name of your
stream in the parameter name).
Use this if you choose to use `NONE` as a `scan.stream.efo.registration`
strategy.
-<span class="label label-info">Note</span> For a given Kinesis data stream,
each EFO consumer must have a unique name.
-However, consumer names do not have to be unique across data streams.
+<span class="label label-info">Note</span> For a given Kinesis data stream,
each EFO consumer must have a unique name.
+However, consumer names do not have to be unique across data streams.
Reusing a consumer name will result in existing subscriptions being terminated.
<span class="label label-info">Note</span> With the `LAZY` and `EAGER`
strategies, stream consumers are de-registered when the job is shutdown
gracefully.
In the event that a job terminates within executing the shutdown hooks, stream
consumers will remain active.
-In this situation the stream consumers will be gracefully reused when the
application restarts.
+In this situation the stream consumers will be gracefully reused when the
application restarts.
With the `NONE` strategy, stream consumer de-registration is not performed by
`FlinkKinesisConsumer`.
Data Type Mapping
----------------
-Kinesis stores records as Base64-encoded binary data objects, so it doesn't
have a notion of internal record structure.
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't
have a notion of internal record structure.
Instead, Kinesis records are deserialized and serialized by formats, e.g.
'avro', 'csv', or 'json'.
-To determine the data type of the messages in your Kinesis-backed tables, pick
a suitable Flink format with the `format` keyword.
+To determine the data type of the messages in your Kinesis-backed tables, pick
a suitable Flink format with the `format` keyword.
Please refer to the [Formats]({% link dev/table/connectors/formats/index.zh.md
%}) pages for more details.
{% top %}
diff --git a/docs/dev/table/connectors/print.md
b/docs/dev/table/connectors/print.md
index ed77139..b001821 100644
--- a/docs/dev/table/connectors/print.md
+++ b/docs/dev/table/connectors/print.md
@@ -2,7 +2,7 @@
title: "Print SQL Connector"
nav-title: Print
nav-parent_id: sql-connectors
-nav-pos: 11
+nav-pos: 13
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/print.zh.md
b/docs/dev/table/connectors/print.zh.md
index 51f8b5b..ee8ef1b 100644
--- a/docs/dev/table/connectors/print.zh.md
+++ b/docs/dev/table/connectors/print.zh.md
@@ -2,7 +2,7 @@
title: "Print SQL 连接器"
nav-title: Print
nav-parent_id: sql-connectors
-nav-pos: 11
+nav-pos: 13
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/upsert-kafka.md
b/docs/dev/table/connectors/upsert-kafka.md
new file mode 100644
index 0000000..c92cf4b
--- /dev/null
+++ b/docs/dev/table/connectors/upsert-kafka.md
@@ -0,0 +1,212 @@
+---
+title: "Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into
Kafka topics in the upsert fashion.
+
+As a source, the upsert-kafka connector produces a changelog stream, where
each data record represents
+an update or delete event. More precisely, the value in a data record is
interpreted as an UPDATE of
+the last value for the same key, if any (if a corresponding key doesn’t exist
yet, the update will
+be considered an INSERT). Using the table analogy, a data record in a
changelog stream is interpreted
+as an UPSERT aka INSERT/UPDATE because any existing row with the same key is
overwritten. Also, null
+values are interpreted in a special way: a record with a null value represents
a “DELETE”.
+
+As a sink, the upsert-kafka connector can consume a changelog stream. It will
write INSERT/UPDATE_AFTER
+data as normal Kafka messages value, and write DELETE data as Kafka messages
with null values
+(indicate tombstone for the key). Flink will guarantee the message ordering on
the primary key by
+partition data on the values of the primary key columns, so the
update/deletion messages on the same
+key will fall into the same partition.
+
+Dependencies
+------------
+
+In order to set up the upsert-kafka connector, the following table provide
dependency information for
+both projects using a build automation tool (such as Maven or SBT) and SQL
Client with SQL JAR bundles.
+
+{% assign connector = site.data.sql-connectors['upsert-kafka'] %}
+{% include sql-connector-download-table.html
+ connector=connector
+%}
+
+Full Example
+----------------
+
+The example below shows how to create and use an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE pageviews_per_region (
+ region STRING,
+ pv BIGINT,
+ uv BIGINT,
+ PRIMARY KEY (region) NOT ENFORCED
+) WITH (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'pageviews_per_region',
+ 'properties.bootstrap.servers' = '...',
+ 'key.format' = 'avro',
+ 'value.format' = 'avro'
+);
+
+CREATE TABLE pageviews (
+ user_id BIGINT,
+ page_id BIGINT,
+ viewtime TIMESTAMP,
+ user_region STRING,
+ WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'pageviews',
+ 'properties.bootstrap.servers' = '...',
+ 'format' = 'json'
+);
+
+-- calculate the pv, uv and insert into the upsert-kafka sink
+INSERT INTO pageviews_per_region
+SELECT
+ region,
+ COUNT(*),
+ COUNT(DISTINCT user_id)
+FROM pageviews
+GROUP BY region;
+
+{% endhighlight %}
+</div>
+</div>
+<span class="label label-danger">Attention</span> Make sure to define the
primary key in the DDL.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify which connector to use, for the Upsert Kafka use:
<code>'upsert-kafka'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>topic</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The Kafka topic name to read from and write to.</td>
+ </tr>
+ <tr>
+ <td><h5>properties.bootstrap.servers</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Comma separated list of Kafka brokers.</td>
+ </tr>
+ <tr>
+ <td><h5>key.format</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize the key part of the
Kafka messages. The key part
+ fields are specified by the PRIMARY KEY syntax. The supported formats
include <code>'csv'</code>,
+ <code>'json'</code>, <code>'avro'</code>. Please refer to <a href="{%
link dev/table/connectors/formats/index.md %}">Formats</a>
+ page for more details and more format options.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>value.format</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize the value part of the
Kafka messages.
+ The supported formats include <code>'csv'</code>, <code>'json'</code>,
<code>'avro'</code>.
+ Please refer to <a href="{% link dev/table/connectors/formats/index.md
%}">Formats</a> page for more details and more format options.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>value.fields-include</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+ <td>String</td>
+ <td>Controls which fields should end up in the value as well. Available
values:
+ <ul>
+ <li><code>ALL</code>: the value part of the record contains all
fields of the schema, even if they are part of the key.</li>
+ <li><code>EXCEPT_KEY</code>: the value part of the record contains
all fields of the schema except the key fields.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.parallelism</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Defines the parallelism of the upsert-kafka sink operator. By
default, the parallelism is determined by the framework using the same
parallelism of the upstream chained operator.</td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+----------------
+
+### Primary Key Constraints
+
+The Upsert Kafka always works in the upsert fashion and requires to define the
primary key in the DDL.
+With the assumption that records with the same key should be ordered in the
same partition, the
+primary key semantic on the changelog source means the materialized changelog
is unique on the primary
+keys. The primary key definition will also control which fields should end up
in Kafka’s key.
+
+### Consistency Guarantees
+
+By default, an Upsert Kafka sink ingests data with at-least-once guarantees
into a Kafka topic if
+the query is executed with [checkpointing enabled]({% link
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+
+This means, Flink may write duplicate records with the same key into the Kafka
topic. But as the
+connector is working in the upsert mode, the last record on the same key will
take effect when
+reading back as a source. Therefore, the upsert-kafka connector achieves
idempotent writes just like
+the [HBase sink]({{ site.baseurl }}/dev/table/connectors/hbase.html).
+
+Data Type Mapping
+----------------
+
+Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn't
have schema or data types.
+The messages are deserialized and serialized by formats, e.g. csv, json, avro.
Thus, the data type mapping
+is determined by specific formats. Please refer to [Formats]({% link
dev/table/connectors/formats/index.md %})
+pages for more details.
+
+{% top %}
diff --git a/docs/dev/table/connectors/upsert-kafka.zh.md
b/docs/dev/table/connectors/upsert-kafka.zh.md
new file mode 100644
index 0000000..c92cf4b
--- /dev/null
+++ b/docs/dev/table/connectors/upsert-kafka.zh.md
@@ -0,0 +1,212 @@
+---
+title: "Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into
Kafka topics in the upsert fashion.
+
+As a source, the upsert-kafka connector produces a changelog stream, where
each data record represents
+an update or delete event. More precisely, the value in a data record is
interpreted as an UPDATE of
+the last value for the same key, if any (if a corresponding key doesn’t exist
yet, the update will
+be considered an INSERT). Using the table analogy, a data record in a
changelog stream is interpreted
+as an UPSERT aka INSERT/UPDATE because any existing row with the same key is
overwritten. Also, null
+values are interpreted in a special way: a record with a null value represents
a “DELETE”.
+
+As a sink, the upsert-kafka connector can consume a changelog stream. It will
write INSERT/UPDATE_AFTER
+data as normal Kafka messages value, and write DELETE data as Kafka messages
with null values
+(indicate tombstone for the key). Flink will guarantee the message ordering on
the primary key by
+partition data on the values of the primary key columns, so the
update/deletion messages on the same
+key will fall into the same partition.
+
+Dependencies
+------------
+
+In order to set up the upsert-kafka connector, the following table provide
dependency information for
+both projects using a build automation tool (such as Maven or SBT) and SQL
Client with SQL JAR bundles.
+
+{% assign connector = site.data.sql-connectors['upsert-kafka'] %}
+{% include sql-connector-download-table.html
+ connector=connector
+%}
+
+Full Example
+----------------
+
+The example below shows how to create and use an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE pageviews_per_region (
+ region STRING,
+ pv BIGINT,
+ uv BIGINT,
+ PRIMARY KEY (region) NOT ENFORCED
+) WITH (
+ 'connector' = 'upsert-kafka',
+ 'topic' = 'pageviews_per_region',
+ 'properties.bootstrap.servers' = '...',
+ 'key.format' = 'avro',
+ 'value.format' = 'avro'
+);
+
+CREATE TABLE pageviews (
+ user_id BIGINT,
+ page_id BIGINT,
+ viewtime TIMESTAMP,
+ user_region STRING,
+ WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'pageviews',
+ 'properties.bootstrap.servers' = '...',
+ 'format' = 'json'
+);
+
+-- calculate the pv, uv and insert into the upsert-kafka sink
+INSERT INTO pageviews_per_region
+SELECT
+ region,
+ COUNT(*),
+ COUNT(DISTINCT user_id)
+FROM pageviews
+GROUP BY region;
+
+{% endhighlight %}
+</div>
+</div>
+<span class="label label-danger">Attention</span> Make sure to define the
primary key in the DDL.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify which connector to use, for the Upsert Kafka use:
<code>'upsert-kafka'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>topic</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The Kafka topic name to read from and write to.</td>
+ </tr>
+ <tr>
+ <td><h5>properties.bootstrap.servers</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Comma separated list of Kafka brokers.</td>
+ </tr>
+ <tr>
+ <td><h5>key.format</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize the key part of the
Kafka messages. The key part
+ fields are specified by the PRIMARY KEY syntax. The supported formats
include <code>'csv'</code>,
+ <code>'json'</code>, <code>'avro'</code>. Please refer to <a href="{%
link dev/table/connectors/formats/index.md %}">Formats</a>
+ page for more details and more format options.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>value.format</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize the value part of the
Kafka messages.
+ The supported formats include <code>'csv'</code>, <code>'json'</code>,
<code>'avro'</code>.
+ Please refer to <a href="{% link dev/table/connectors/formats/index.md
%}">Formats</a> page for more details and more format options.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>value.fields-include</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+ <td>String</td>
+ <td>Controls which fields should end up in the value as well. Available
values:
+ <ul>
+ <li><code>ALL</code>: the value part of the record contains all
fields of the schema, even if they are part of the key.</li>
+ <li><code>EXCEPT_KEY</code>: the value part of the record contains
all fields of the schema except the key fields.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.parallelism</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Defines the parallelism of the upsert-kafka sink operator. By
default, the parallelism is determined by the framework using the same
parallelism of the upstream chained operator.</td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+----------------
+
+### Primary Key Constraints
+
+The Upsert Kafka always works in the upsert fashion and requires to define the
primary key in the DDL.
+With the assumption that records with the same key should be ordered in the
same partition, the
+primary key semantic on the changelog source means the materialized changelog
is unique on the primary
+keys. The primary key definition will also control which fields should end up
in Kafka’s key.
+
+### Consistency Guarantees
+
+By default, an Upsert Kafka sink ingests data with at-least-once guarantees
into a Kafka topic if
+the query is executed with [checkpointing enabled]({% link
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+
+This means, Flink may write duplicate records with the same key into the Kafka
topic. But as the
+connector is working in the upsert mode, the last record on the same key will
take effect when
+reading back as a source. Therefore, the upsert-kafka connector achieves
idempotent writes just like
+the [HBase sink]({{ site.baseurl }}/dev/table/connectors/hbase.html).
+
+Data Type Mapping
+----------------
+
+Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn't
have schema or data types.
+The messages are deserialized and serialized by formats, e.g. csv, json, avro.
Thus, the data type mapping
+is determined by specific formats. Please refer to [Formats]({% link
dev/table/connectors/formats/index.md %})
+pages for more details.
+
+{% top %}