This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ce0f39  [FLINK-19859][docs] Add document for the upsert-kafka 
connector
7ce0f39 is described below

commit 7ce0f39058a03bb95bb00e444c75494dcb8d3b2c
Author: Shengkai <[email protected]>
AuthorDate: Tue Nov 17 11:52:43 2020 +0800

    [FLINK-19859][docs] Add document for the upsert-kafka connector
    
    This closes #14017
---
 docs/_data/sql-connectors.yml                 |   8 +
 docs/dev/table/connectors/blackhole.md        |   2 +-
 docs/dev/table/connectors/blackhole.zh.md     |   2 +-
 docs/dev/table/connectors/datagen.md          |  18 +--
 docs/dev/table/connectors/datagen.zh.md       |   2 +-
 docs/dev/table/connectors/elasticsearch.md    |   2 +-
 docs/dev/table/connectors/elasticsearch.zh.md |   2 +-
 docs/dev/table/connectors/filesystem.md       |  14 +-
 docs/dev/table/connectors/filesystem.zh.md    |  14 +-
 docs/dev/table/connectors/hbase.md            |   2 +-
 docs/dev/table/connectors/hbase.zh.md         |   2 +-
 docs/dev/table/connectors/jdbc.md             |  16 +-
 docs/dev/table/connectors/jdbc.zh.md          |   2 +-
 docs/dev/table/connectors/kafka.md            |   2 +-
 docs/dev/table/connectors/kafka.zh.md         |   2 +-
 docs/dev/table/connectors/kinesis.md          |  32 ++--
 docs/dev/table/connectors/kinesis.zh.md       |  32 ++--
 docs/dev/table/connectors/print.md            |   2 +-
 docs/dev/table/connectors/print.zh.md         |   2 +-
 docs/dev/table/connectors/upsert-kafka.md     | 212 ++++++++++++++++++++++++++
 docs/dev/table/connectors/upsert-kafka.zh.md  | 212 ++++++++++++++++++++++++++
 21 files changed, 507 insertions(+), 75 deletions(-)

diff --git a/docs/_data/sql-connectors.yml b/docs/_data/sql-connectors.yml
index 2c14a00..181eb0c 100644
--- a/docs/_data/sql-connectors.yml
+++ b/docs/_data/sql-connectors.yml
@@ -117,6 +117,14 @@ kafka:
           maven: flink-connector-kafka{{site.scala_version_suffix}}
           sql-url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar
 
+upsert-kafka:
+    name: Upsert Kafka
+    category: connector
+    versions:
+        - version: universal
+          maven: flink-connector-kafka{{site.scala_version_suffix}}
+          sql-url: 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar
+
 kinesis:
     name: Kinesis
     category: connector
diff --git a/docs/dev/table/connectors/blackhole.md 
b/docs/dev/table/connectors/blackhole.md
index b54f906..2692bdf 100644
--- a/docs/dev/table/connectors/blackhole.md
+++ b/docs/dev/table/connectors/blackhole.md
@@ -2,7 +2,7 @@
 title: "BlackHole SQL Connector"
 nav-title: BlackHole
 nav-parent_id: sql-connectors
-nav-pos: 12
+nav-pos: 14
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/blackhole.zh.md 
b/docs/dev/table/connectors/blackhole.zh.md
index 0727695..53e3d9d 100644
--- a/docs/dev/table/connectors/blackhole.zh.md
+++ b/docs/dev/table/connectors/blackhole.zh.md
@@ -2,7 +2,7 @@
 title: "BlackHole SQL 连接器"
 nav-title: BlackHole
 nav-parent_id: sql-connectors
-nav-pos: 12
+nav-pos: 14
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/datagen.md 
b/docs/dev/table/connectors/datagen.md
index 6de7169..dcf4465 100644
--- a/docs/dev/table/connectors/datagen.md
+++ b/docs/dev/table/connectors/datagen.md
@@ -2,7 +2,7 @@
 title: "DataGen SQL Connector"
 nav-title: DataGen
 nav-parent_id: sql-connectors
-nav-pos: 10
+nav-pos: 12
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -58,7 +58,7 @@ CREATE TABLE Orders (
 )
 {% endhighlight %}
 
-Often, the data generator connector is used in conjuction with the ``LIKE`` 
clause to mock out physical tables. 
+Often, the data generator connector is used in conjuction with the ``LIKE`` 
clause to mock out physical tables.
 
 {% highlight sql %}
 CREATE TABLE Orders (
@@ -69,8 +69,8 @@ CREATE TABLE Orders (
 ) WITH (...)
 
 -- create a bounded mock table
-CREATE TEMPORARY TABLE GenOrders 
-WITH ( 
+CREATE TEMPORARY TABLE GenOrders
+WITH (
     'connector' = 'datagen',
     'number-of-rows' = '10'
 )
@@ -118,17 +118,17 @@ Types
             <td>TINYINT</td>
             <td>random / sequence</td>
             <td></td>
-        </tr>  
+        </tr>
         <tr>
             <td>SMALLINT</td>
             <td>random / sequence</td>
             <td></td>
-        </tr> 
+        </tr>
         <tr>
             <td>INT</td>
             <td>random / sequence</td>
             <td></td>
-        </tr> 
+        </tr>
         <tr>
             <td>BIGINT</td>
             <td>random / sequence</td>
@@ -158,12 +158,12 @@ Types
             <td>TIMESTAMP</td>
             <td>random</td>
             <td>Always resolves to the current timestamp of the local 
machine.</td>
-        </tr>    
+        </tr>
         <tr>
             <td>TIMESTAMP WITH LOCAL TIMEZONE</td>
             <td>random</td>
             <td>Always resolves to the current timestamp of the local 
machine.</td>
-        </tr>   
+        </tr>
         <tr>
             <td>INTERVAL YEAR TO MONTH</td>
             <td>random</td>
diff --git a/docs/dev/table/connectors/datagen.zh.md 
b/docs/dev/table/connectors/datagen.zh.md
index 92b5972..330079b 100644
--- a/docs/dev/table/connectors/datagen.zh.md
+++ b/docs/dev/table/connectors/datagen.zh.md
@@ -2,7 +2,7 @@
 title: "DataGen SQL 连接器"
 nav-title: DataGen
 nav-parent_id: sql-connectors
-nav-pos: 10
+nav-pos: 12
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/elasticsearch.md 
b/docs/dev/table/connectors/elasticsearch.md
index fc90bb7..c7cabaf 100644
--- a/docs/dev/table/connectors/elasticsearch.md
+++ b/docs/dev/table/connectors/elasticsearch.md
@@ -2,7 +2,7 @@
 title: "Elasticsearch SQL Connector"
 nav-title: Elasticsearch
 nav-parent_id: sql-connectors
-nav-pos: 4
+nav-pos: 6
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/elasticsearch.zh.md 
b/docs/dev/table/connectors/elasticsearch.zh.md
index d2e67b8..5ba437c 100644
--- a/docs/dev/table/connectors/elasticsearch.zh.md
+++ b/docs/dev/table/connectors/elasticsearch.zh.md
@@ -2,7 +2,7 @@
 title: "Elasticsearch SQL Connector"
 nav-title: Elasticsearch
 nav-parent_id: sql-connectors
-nav-pos: 4
+nav-pos: 6
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/filesystem.md 
b/docs/dev/table/connectors/filesystem.md
index 63f1388..015113c 100644
--- a/docs/dev/table/connectors/filesystem.md
+++ b/docs/dev/table/connectors/filesystem.md
@@ -2,7 +2,7 @@
 title: "FileSystem SQL Connector"
 nav-title: FileSystem
 nav-parent_id: sql-connectors
-nav-pos: 5
+nav-pos: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -51,7 +51,7 @@ CREATE TABLE MyUserTable (
                                         -- section for more details
   'partition.default-name' = '...',     -- optional: default partition name in 
case the dynamic partition
                                         -- column value is null/empty string
-  
+
   -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
   -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is false.
   'sink.shuffle-by-partition.enable' = '...',
@@ -65,7 +65,7 @@ CREATE TABLE MyUserTable (
 
 <span class="label label-danger">Attention</span> File system sources for 
streaming is still under development. In the future, the community will add 
support for common streaming use cases, i.e., partition and directory 
monitoring.
 
-<span class="label label-danger">Attention</span> The behaviour of file system 
connector is much different from `previous legacy filesystem connector`: 
+<span class="label label-danger">Attention</span> The behaviour of file system 
connector is much different from `previous legacy filesystem connector`:
 the path parameter is specified for a directory not for a file and you can't 
get a human-readable file in the path that you declare.
 
 ## Partition Files
@@ -186,7 +186,7 @@ When running file compaction in production, please be aware 
that:
 
 ### Partition Commit
 
-After writing a partition, it is often necessary to notify downstream 
applications. For example, add the partition to a Hive metastore or writing a 
`_SUCCESS` file in the directory. The file system sink contains a partition 
commit feature that allows configuring custom policies. Commit actions are 
based on a combination of `triggers` and `policies`. 
+After writing a partition, it is often necessary to notify downstream 
applications. For example, add the partition to a Hive metastore or writing a 
`_SUCCESS` file in the directory. The file system sink contains a partition 
commit feature that allows configuring custom policies. Commit actions are 
based on a combination of `triggers` and `policies`.
 
 - Trigger: The timing of the commit of the partition can be determined by the 
watermark with the time extracted from the partition, or by processing time.
 - Policy: How to commit a partition, built-in policies support for the commit 
of success files and metastore, you can also implement your own policies, such 
as triggering hive's analysis to generate statistics, or merging small files, 
etc.
@@ -284,7 +284,7 @@ Time extractors define extracting time from partition 
values.
   </tbody>
 </table>
 
-The default extractor is based on a timestamp pattern composed of your 
partition fields. You can also specify an implementation for fully custom 
partition extraction based on the `PartitionTimeExtractor` interface. 
+The default extractor is based on a timestamp pattern composed of your 
partition fields. You can also specify an implementation for fully custom 
partition extraction based on the `PartitionTimeExtractor` interface.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -305,7 +305,7 @@ public class HourPartTimeExtractor implements 
PartitionTimeExtractor {
 
 #### Partition Commit Policy
 
-The partition commit policy defines what action is taken when partitions are 
committed. 
+The partition commit policy defines what action is taken when partitions are 
committed.
 
 - The first is metastore, only hive table supports metastore policy, file 
system manages partitions through directory structure.
 - The second is the success file, which will write an empty file in the 
directory corresponding to the partition.
@@ -373,7 +373,7 @@ public class AnalysisCommitPolicy implements 
PartitionCommitPolicy {
 
 ## Full Example
 
-The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out. 
+The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out.
 
 {% highlight sql %}
 
diff --git a/docs/dev/table/connectors/filesystem.zh.md 
b/docs/dev/table/connectors/filesystem.zh.md
index 63f1388..015113c 100644
--- a/docs/dev/table/connectors/filesystem.zh.md
+++ b/docs/dev/table/connectors/filesystem.zh.md
@@ -2,7 +2,7 @@
 title: "FileSystem SQL Connector"
 nav-title: FileSystem
 nav-parent_id: sql-connectors
-nav-pos: 5
+nav-pos: 7
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -51,7 +51,7 @@ CREATE TABLE MyUserTable (
                                         -- section for more details
   'partition.default-name' = '...',     -- optional: default partition name in 
case the dynamic partition
                                         -- column value is null/empty string
-  
+
   -- optional: the option to enable shuffle data by dynamic partition fields 
in sink phase, this can greatly
   -- reduce the number of file for filesystem sink but may lead data skew, the 
default value is false.
   'sink.shuffle-by-partition.enable' = '...',
@@ -65,7 +65,7 @@ CREATE TABLE MyUserTable (
 
 <span class="label label-danger">Attention</span> File system sources for 
streaming is still under development. In the future, the community will add 
support for common streaming use cases, i.e., partition and directory 
monitoring.
 
-<span class="label label-danger">Attention</span> The behaviour of file system 
connector is much different from `previous legacy filesystem connector`: 
+<span class="label label-danger">Attention</span> The behaviour of file system 
connector is much different from `previous legacy filesystem connector`:
 the path parameter is specified for a directory not for a file and you can't 
get a human-readable file in the path that you declare.
 
 ## Partition Files
@@ -186,7 +186,7 @@ When running file compaction in production, please be aware 
that:
 
 ### Partition Commit
 
-After writing a partition, it is often necessary to notify downstream 
applications. For example, add the partition to a Hive metastore or writing a 
`_SUCCESS` file in the directory. The file system sink contains a partition 
commit feature that allows configuring custom policies. Commit actions are 
based on a combination of `triggers` and `policies`. 
+After writing a partition, it is often necessary to notify downstream 
applications. For example, add the partition to a Hive metastore or writing a 
`_SUCCESS` file in the directory. The file system sink contains a partition 
commit feature that allows configuring custom policies. Commit actions are 
based on a combination of `triggers` and `policies`.
 
 - Trigger: The timing of the commit of the partition can be determined by the 
watermark with the time extracted from the partition, or by processing time.
 - Policy: How to commit a partition, built-in policies support for the commit 
of success files and metastore, you can also implement your own policies, such 
as triggering hive's analysis to generate statistics, or merging small files, 
etc.
@@ -284,7 +284,7 @@ Time extractors define extracting time from partition 
values.
   </tbody>
 </table>
 
-The default extractor is based on a timestamp pattern composed of your 
partition fields. You can also specify an implementation for fully custom 
partition extraction based on the `PartitionTimeExtractor` interface. 
+The default extractor is based on a timestamp pattern composed of your 
partition fields. You can also specify an implementation for fully custom 
partition extraction based on the `PartitionTimeExtractor` interface.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -305,7 +305,7 @@ public class HourPartTimeExtractor implements 
PartitionTimeExtractor {
 
 #### Partition Commit Policy
 
-The partition commit policy defines what action is taken when partitions are 
committed. 
+The partition commit policy defines what action is taken when partitions are 
committed.
 
 - The first is metastore, only hive table supports metastore policy, file 
system manages partitions through directory structure.
 - The second is the success file, which will write an empty file in the 
directory corresponding to the partition.
@@ -373,7 +373,7 @@ public class AnalysisCommitPolicy implements 
PartitionCommitPolicy {
 
 ## Full Example
 
-The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out. 
+The below shows how the file system connector can be used to write a streaming 
query to write data from Kafka into a file system and runs a batch query to 
read that data back out.
 
 {% highlight sql %}
 
diff --git a/docs/dev/table/connectors/hbase.md 
b/docs/dev/table/connectors/hbase.md
index b1d44b8..ccb5d8d4 100644
--- a/docs/dev/table/connectors/hbase.md
+++ b/docs/dev/table/connectors/hbase.md
@@ -2,7 +2,7 @@
 title: "HBase SQL Connector"
 nav-title: HBase
 nav-parent_id: sql-connectors
-nav-pos: 6
+nav-pos: 8
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/hbase.zh.md 
b/docs/dev/table/connectors/hbase.zh.md
index 318c682..02de4f9 100644
--- a/docs/dev/table/connectors/hbase.zh.md
+++ b/docs/dev/table/connectors/hbase.zh.md
@@ -2,7 +2,7 @@
 title: "HBase SQL 连接器"
 nav-title: HBase
 nav-parent_id: sql-connectors
-nav-pos: 6
+nav-pos: 8
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/jdbc.md 
b/docs/dev/table/connectors/jdbc.md
index 089aec4..52a65c2 100644
--- a/docs/dev/table/connectors/jdbc.md
+++ b/docs/dev/table/connectors/jdbc.md
@@ -2,7 +2,7 @@
 title: "JDBC SQL Connector"
 nav-title: JDBC
 nav-parent_id: sql-connectors
-nav-pos: 3
+nav-pos: 5
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -147,7 +147,7 @@ Connector Options
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The JDBC password.</td>
-    </tr> 
+    </tr>
     <tr>
       <td><h5>scan.partition.column</h5></td>
       <td>optional</td>
@@ -161,7 +161,7 @@ Connector Options
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>The number of partitions.</td>
-    </tr> 
+    </tr>
     <tr>
       <td><h5>scan.partition.lower-bound</h5></td>
       <td>optional</td>
@@ -221,7 +221,7 @@ Connector Options
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
       <td>The max size of buffered records before flush. Can be set to zero to 
disable it.</td>
-    </tr> 
+    </tr>
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
@@ -235,7 +235,7 @@ Connector Options
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The max retry times if writing records to database failed.</td>
-    </tr>          
+    </tr>
     </tbody>
 </table>
 
@@ -289,7 +289,7 @@ As there is no standard syntax for upsert, the following 
table describes the dat
         <th class="text-left">Database</th>
         <th class="text-left">Upsert Grammar</th>
        </tr>
-    </thead>    
+    </thead>
     <tbody>
         <tr>
             <td>MySQL</td>
@@ -309,7 +309,7 @@ The `JdbcCatalog` enables users to connect Flink to 
relational databases over JD
 Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the 
moment, `PostgresCatalog` only supports limited `Catalog` methods include:
 
 {% highlight java %}
-// The supported methods by Postgres Catalog. 
+// The supported methods by Postgres Catalog.
 PostgresCatalog.databaseExists(String databaseName)
 PostgresCatalog.listDatabases()
 PostgresCatalog.getDatabase(String databaseName)
@@ -411,7 +411,7 @@ execution:
     ...
     current-catalog: mypg  # set the JdbcCatalog as the current catalog of the 
session
     current-database: mydb
-    
+
 catalogs:
    - name: mypg
      type: jdbc
diff --git a/docs/dev/table/connectors/jdbc.zh.md 
b/docs/dev/table/connectors/jdbc.zh.md
index 3c9a24d..588163b 100644
--- a/docs/dev/table/connectors/jdbc.zh.md
+++ b/docs/dev/table/connectors/jdbc.zh.md
@@ -2,7 +2,7 @@
 title: "JDBC SQL Connector"
 nav-title: JDBC
 nav-parent_id: sql-connectors
-nav-pos: 3
+nav-pos: 5
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/kafka.md 
b/docs/dev/table/connectors/kafka.md
index 013f904..13992dc 100644
--- a/docs/dev/table/connectors/kafka.md
+++ b/docs/dev/table/connectors/kafka.md
@@ -175,7 +175,7 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">at-least-once</td>
       <td>String</td>
-      <td>Defines the delivery semantic for the Kafka sink. Valid 
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and 
<code>'none'</code>. See <a href='#consistency-guarantees'>Consistency 
guarantees</a> for more details. </td>
+      <td>Defines the delivery semantic for the Kafka sink. Valid 
enumerationns are <code>'at-least-once'</code>, <code>'exactly-once'</code> and 
<code>'none'</code>. See <a href='#consistency-guarantees'>Consistency 
guarantees</a> for more details. </td>
     </tr>
     <tr>
       <td><h5>sink.parallelism</h5></td>
diff --git a/docs/dev/table/connectors/kafka.zh.md 
b/docs/dev/table/connectors/kafka.zh.md
index 8ae4580..87c431b 100644
--- a/docs/dev/table/connectors/kafka.zh.md
+++ b/docs/dev/table/connectors/kafka.zh.md
@@ -182,7 +182,7 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">at-least-once</td>
       <td>String</td>
-      <td>Defines the delivery semantic for the Kafka sink. Valid 
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and 
<code>'none'</code>.
+      <td>Defines the delivery semantic for the Kafka sink. Valid 
enumerationns are <code>'at-least-once'</code>, <code>'exactly-once'</code> and 
<code>'none'</code>.
       See <a href='#consistency-guarantees'>Consistency guarantees</a> for 
more details. </td>
     </tr>
     <tr>
diff --git a/docs/dev/table/connectors/kinesis.md 
b/docs/dev/table/connectors/kinesis.md
index 739b84b..85dc0a3 100644
--- a/docs/dev/table/connectors/kinesis.md
+++ b/docs/dev/table/connectors/kinesis.md
@@ -2,7 +2,7 @@
 title: "Amazon Kinesis Data Streams SQL Connector"
 nav-title: Kinesis
 nav-parent_id: sql-connectors
-nav-pos: 2
+nav-pos: 4
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -640,7 +640,7 @@ Connector Options
       <td style="word-wrap: break-word;">(none)</td>
       <td></td>
       <td>
-        Sink options for the <code>KinesisProducer</code>. 
+        Sink options for the <code>KinesisProducer</code>.
         Suffix names must match the <a 
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html";>KinesisProducerConfiguration</a>
 setters in lower-case hyphenated style (for example, 
<code>sink.producer.collection-max-count</code> or 
<code>sink.producer.aggregation-max-count</code>).
         The transformed action keys are passed to the 
<code>sink.producer.*</code> to <a 
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-";>KinesisProducerConfigurations#fromProperties</a>.
         Note that some of the defaults are overwritten by 
<code>KinesisConfigUtil</code>.
@@ -660,18 +660,18 @@ Make sure to [create an appropriate IAM 
policy](https://docs.aws.amazon.com/stre
 
 Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
 By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.  
+If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.
 
 A specific 
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
 can be **optionally** set using the `aws.credentials.provider` setting.
 Supported values are:
 
 * `AUTO` - Use the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
-* `BASIC` - Use access key ID and secret key supplied as configuration. 
+* `BASIC` - Use access key ID and secret key supplied as configuration.
 * `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
 * `SYS_PROP` - Use Java system properties `aws.accessKeyId` and 
`aws.secretKey`.
 * `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
 * `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
 
 ### Start Reading Position
 
@@ -702,11 +702,11 @@ You can, however, use the 
`sink.partitioner-field-delimiter` option to set the d
 ### Enhanced Fan-Out
 
 [Enhanced Fan-Out 
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the 
maximum number of concurrent consumers per Kinesis data stream.
-Without EFO, all concurrent Kinesis consumers share a single read quota per 
shard. 
-Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers. 
+Without EFO, all concurrent Kinesis consumers share a single read quota per 
shard.
+Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers.
 
 <span class="label label-info">Note</span> Using EFO will [incur additional 
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
- 
+
 You can enable and configure EFO with the following properties:
 
 * `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`.
@@ -720,10 +720,10 @@ You can enable and configure EFO with the following 
properties:
     The describe operation has a limit of 20 [transactions per 
second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
     this means application startup time will increase by roughly 
`parallelism/20 seconds`.
   * `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` 
constructor.
-    If the stream consumer already exists, it will be reused. 
-    This will result in registration occurring when the job is constructed, 
+    If the stream consumer already exists, it will be reused.
+    This will result in registration occurring when the job is constructed,
     either on the Flink Job Manager or client environment submitting the job.
-    Using this strategy results in a single thread registering and retrieving 
the stream consumer ARN, 
+    Using this strategy results in a single thread registering and retrieving 
the stream consumer ARN,
     reducing startup time over `LAZY` (with large parallelism).
     However, consider that the client environment will require access to the 
AWS services.
   * `NONE`: Stream consumer registration is not performed by 
`FlinkKinesisConsumer`.
@@ -733,21 +733,21 @@ You can enable and configure EFO with the following 
properties:
 * `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally 
registered ARN-consumers (substitute `<stream-name>` with the name of your 
stream in the parameter name).
    Use this if you choose to use `NONE` as a `scan.stream.efo.registration` 
strategy.
 
-<span class="label label-info">Note</span> For a given Kinesis data stream, 
each EFO consumer must have a unique name. 
-However, consumer names do not have to be unique across data streams. 
+<span class="label label-info">Note</span> For a given Kinesis data stream, 
each EFO consumer must have a unique name.
+However, consumer names do not have to be unique across data streams.
 Reusing a consumer name will result in existing subscriptions being terminated.
 
 <span class="label label-info">Note</span> With the `LAZY` and `EAGER` 
strategies, stream consumers are de-registered when the job is shutdown 
gracefully.
 In the event that a job terminates within executing the shutdown hooks, stream 
consumers will remain active.
-In this situation the stream consumers will be gracefully reused when the 
application restarts. 
+In this situation the stream consumers will be gracefully reused when the 
application restarts.
 With the `NONE` strategy, stream consumer de-registration is not performed by 
`FlinkKinesisConsumer`.
 
 Data Type Mapping
 ----------------
 
-Kinesis stores records as Base64-encoded binary data objects, so it doesn't 
have a notion of internal record structure. 
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't 
have a notion of internal record structure.
 Instead, Kinesis records are deserialized and serialized by formats, e.g. 
'avro', 'csv', or 'json'.
-To determine the data type of the messages in your Kinesis-backed tables, pick 
a suitable Flink format with the `format` keyword. 
+To determine the data type of the messages in your Kinesis-backed tables, pick 
a suitable Flink format with the `format` keyword.
 Please refer to the [Formats]({% link dev/table/connectors/formats/index.md 
%}) pages for more details.
 
 {% top %}
diff --git a/docs/dev/table/connectors/kinesis.zh.md 
b/docs/dev/table/connectors/kinesis.zh.md
index 2bdad3d..ca24aab 100644
--- a/docs/dev/table/connectors/kinesis.zh.md
+++ b/docs/dev/table/connectors/kinesis.zh.md
@@ -2,7 +2,7 @@
 title: "Amazon Kinesis Data Streams SQL Connector"
 nav-title: Kinesis
 nav-parent_id: sql-connectors
-nav-pos: 2
+nav-pos: 4
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -645,7 +645,7 @@ Connector Options
       <td style="word-wrap: break-word;">(none)</td>
       <td></td>
       <td>
-        Sink options for the <code>KinesisProducer</code>. 
+        Sink options for the <code>KinesisProducer</code>.
         Suffix names must match the <a 
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html";>KinesisProducerConfiguration</a>
 setters in lower-case hyphenated style (for example, 
<code>sink.producer.collection-max-count</code> or 
<code>sink.producer.aggregation-max-count</code>).
         The transformed action keys are passed to the 
<code>sink.producer.*</code> to <a 
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-";>KinesisProducerConfigurations#fromProperties</a>.
         Note that some of the defaults are overwritten by 
<code>KinesisConfigUtil</code>.
@@ -665,18 +665,18 @@ Make sure to [create an appropriate IAM 
policy](https://docs.aws.amazon.com/stre
 
 Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
 By default, the `AUTO` Credentials Provider is used.
-If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.  
+If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.
 
 A specific 
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
 can be **optionally** set using the `aws.credentials.provider` setting.
 Supported values are:
 
 * `AUTO` - Use the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
-* `BASIC` - Use access key ID and secret key supplied as configuration. 
+* `BASIC` - Use access key ID and secret key supplied as configuration.
 * `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
 * `SYS_PROP` - Use Java system properties `aws.accessKeyId` and 
`aws.secretKey`.
 * `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
 * `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
-* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token.
 
 ### Start Reading Position
 
@@ -707,11 +707,11 @@ You can, however, use the 
`sink.partitioner-field-delimiter` option to set the d
 ### Enhanced Fan-Out
 
 [Enhanced Fan-Out 
(EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the 
maximum number of concurrent consumers per Kinesis data stream.
-Without EFO, all concurrent Kinesis consumers share a single read quota per 
shard. 
-Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers. 
+Without EFO, all concurrent Kinesis consumers share a single read quota per 
shard.
+Using EFO, each consumer gets a distinct dedicated read quota per shard, 
allowing read throughput to scale with the number of consumers.
 
 <span class="label label-info">Note</span> Using EFO will [incur additional 
cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
- 
+
 You can enable and configure EFO with the following properties:
 
 * `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`.
@@ -725,10 +725,10 @@ You can enable and configure EFO with the following 
properties:
     The describe operation has a limit of 20 [transactions per 
second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
     this means application startup time will increase by roughly 
`parallelism/20 seconds`.
   * `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` 
constructor.
-    If the stream consumer already exists, it will be reused. 
-    This will result in registration occurring when the job is constructed, 
+    If the stream consumer already exists, it will be reused.
+    This will result in registration occurring when the job is constructed,
     either on the Flink Job Manager or client environment submitting the job.
-    Using this strategy results in a single thread registering and retrieving 
the stream consumer ARN, 
+    Using this strategy results in a single thread registering and retrieving 
the stream consumer ARN,
     reducing startup time over `LAZY` (with large parallelism).
     However, consider that the client environment will require access to the 
AWS services.
   * `NONE`: Stream consumer registration is not performed by 
`FlinkKinesisConsumer`.
@@ -738,21 +738,21 @@ You can enable and configure EFO with the following 
properties:
 * `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally 
registered ARN-consumers (substitute `<stream-name>` with the name of your 
stream in the parameter name).
    Use this if you choose to use `NONE` as a `scan.stream.efo.registration` 
strategy.
 
-<span class="label label-info">Note</span> For a given Kinesis data stream, 
each EFO consumer must have a unique name. 
-However, consumer names do not have to be unique across data streams. 
+<span class="label label-info">Note</span> For a given Kinesis data stream, 
each EFO consumer must have a unique name.
+However, consumer names do not have to be unique across data streams.
 Reusing a consumer name will result in existing subscriptions being terminated.
 
 <span class="label label-info">Note</span> With the `LAZY` and `EAGER` 
strategies, stream consumers are de-registered when the job is shutdown 
gracefully.
 In the event that a job terminates within executing the shutdown hooks, stream 
consumers will remain active.
-In this situation the stream consumers will be gracefully reused when the 
application restarts. 
+In this situation the stream consumers will be gracefully reused when the 
application restarts.
 With the `NONE` strategy, stream consumer de-registration is not performed by 
`FlinkKinesisConsumer`.
 
 Data Type Mapping
 ----------------
 
-Kinesis stores records as Base64-encoded binary data objects, so it doesn't 
have a notion of internal record structure. 
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't 
have a notion of internal record structure.
 Instead, Kinesis records are deserialized and serialized by formats, e.g. 
'avro', 'csv', or 'json'.
-To determine the data type of the messages in your Kinesis-backed tables, pick 
a suitable Flink format with the `format` keyword. 
+To determine the data type of the messages in your Kinesis-backed tables, pick 
a suitable Flink format with the `format` keyword.
 Please refer to the [Formats]({% link dev/table/connectors/formats/index.zh.md 
%}) pages for more details.
 
 {% top %}
diff --git a/docs/dev/table/connectors/print.md 
b/docs/dev/table/connectors/print.md
index ed77139..b001821 100644
--- a/docs/dev/table/connectors/print.md
+++ b/docs/dev/table/connectors/print.md
@@ -2,7 +2,7 @@
 title: "Print SQL Connector"
 nav-title: Print
 nav-parent_id: sql-connectors
-nav-pos: 11
+nav-pos: 13
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/print.zh.md 
b/docs/dev/table/connectors/print.zh.md
index 51f8b5b..ee8ef1b 100644
--- a/docs/dev/table/connectors/print.zh.md
+++ b/docs/dev/table/connectors/print.zh.md
@@ -2,7 +2,7 @@
 title: "Print SQL 连接器"
 nav-title: Print
 nav-parent_id: sql-connectors
-nav-pos: 11
+nav-pos: 13
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/upsert-kafka.md 
b/docs/dev/table/connectors/upsert-kafka.md
new file mode 100644
index 0000000..c92cf4b
--- /dev/null
+++ b/docs/dev/table/connectors/upsert-kafka.md
@@ -0,0 +1,212 @@
+---
+title: "Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+As a source, the upsert-kafka connector produces a changelog stream, where 
each data record represents
+an update or delete event. More precisely, the value in a data record is 
interpreted as an UPDATE of
+the last value for the same key, if any (if a corresponding key doesn’t exist 
yet, the update will
+be considered an INSERT). Using the table analogy, a data record in a 
changelog stream is interpreted
+as an UPSERT aka INSERT/UPDATE because any existing row with the same key is 
overwritten. Also, null
+values are interpreted in a special way: a record with a null value represents 
a “DELETE”.
+
+As a sink, the upsert-kafka connector can consume a changelog stream. It will 
write INSERT/UPDATE_AFTER
+data as normal Kafka messages value, and write DELETE data as Kafka messages 
with null values
+(indicate tombstone for the key). Flink will guarantee the message ordering on 
the primary key by
+partition data on the values of the primary key columns, so the 
update/deletion messages on the same
+key will fall into the same partition.
+
+Dependencies
+------------
+
+In order to set up the upsert-kafka connector, the following table provide 
dependency information for
+both projects using a build automation tool (such as Maven or SBT) and SQL 
Client with SQL JAR bundles.
+
+{% assign connector = site.data.sql-connectors['upsert-kafka'] %}
+{% include sql-connector-download-table.html
+    connector=connector
+%}
+
+Full Example
+----------------
+
+The example below shows how to create and use an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE pageviews_per_region (
+  region STRING,
+  pv BIGINT,
+  uv BIGINT,
+  PRIMARY KEY (region) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'pageviews_per_region',
+  'properties.bootstrap.servers' = '...',
+  'key.format' = 'avro',
+  'value.format' = 'avro'
+);
+
+CREATE TABLE pageviews (
+  user_id BIGINT,
+  page_id BIGINT,
+  viewtime TIMESTAMP,
+  user_region STRING,
+  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'pageviews',
+  'properties.bootstrap.servers' = '...',
+  'format' = 'json'
+);
+
+-- calculate the pv, uv and insert into the upsert-kafka sink
+INSERT INTO pageviews_per_region
+SELECT
+  region,
+  COUNT(*),
+  COUNT(DISTINCT user_id)
+FROM pageviews
+GROUP BY region;
+
+{% endhighlight %}
+</div>
+</div>
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the DDL.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The Kafka topic name to read from and write to.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages. The key part
+      fields are specified by the PRIMARY KEY syntax. The supported formats 
include <code>'csv'</code>,
+      <code>'json'</code>, <code>'avro'</code>. Please refer to <a href="{% 
link dev/table/connectors/formats/index.md %}">Formats</a>
+      page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats include <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which fields should end up in the value as well. Available 
values:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema, even if they are part of the key.</li>
+         <li><code>EXCEPT_KEY</code>: the value part of the record contains 
all fields of the schema except the key fields.</li>
+       </ul>
+       </td>
+    </tr>
+    <tr>
+      <td><h5>sink.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism of the upsert-kafka sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Primary Key Constraints
+
+The Upsert Kafka always works in the upsert fashion and requires to define the 
primary key in the DDL.
+With the assumption that records with the same key should be ordered in the 
same partition, the
+primary key semantic on the changelog source means the materialized changelog 
is unique on the primary
+keys. The primary key definition will also control which fields should end up 
in Kafka’s key.
+
+### Consistency Guarantees
+
+By default, an Upsert Kafka sink ingests data with at-least-once guarantees 
into a Kafka topic if
+the query is executed with [checkpointing enabled]({% link 
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+
+This means, Flink may write duplicate records with the same key into the Kafka 
topic. But as the
+connector is working in the upsert mode, the last record on the same key will 
take effect when
+reading back as a source. Therefore, the upsert-kafka connector achieves 
idempotent writes just like
+the [HBase sink]({{ site.baseurl }}/dev/table/connectors/hbase.html).
+
+Data Type Mapping
+----------------
+
+Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn't 
have schema or data types.
+The messages are deserialized and serialized by formats, e.g. csv, json, avro. 
Thus, the data type mapping
+is determined by specific formats. Please refer to [Formats]({% link 
dev/table/connectors/formats/index.md %})
+pages for more details.
+
+{% top %}
diff --git a/docs/dev/table/connectors/upsert-kafka.zh.md 
b/docs/dev/table/connectors/upsert-kafka.zh.md
new file mode 100644
index 0000000..c92cf4b
--- /dev/null
+++ b/docs/dev/table/connectors/upsert-kafka.zh.md
@@ -0,0 +1,212 @@
+---
+title: "Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+As a source, the upsert-kafka connector produces a changelog stream, where 
each data record represents
+an update or delete event. More precisely, the value in a data record is 
interpreted as an UPDATE of
+the last value for the same key, if any (if a corresponding key doesn’t exist 
yet, the update will
+be considered an INSERT). Using the table analogy, a data record in a 
changelog stream is interpreted
+as an UPSERT aka INSERT/UPDATE because any existing row with the same key is 
overwritten. Also, null
+values are interpreted in a special way: a record with a null value represents 
a “DELETE”.
+
+As a sink, the upsert-kafka connector can consume a changelog stream. It will 
write INSERT/UPDATE_AFTER
+data as normal Kafka messages value, and write DELETE data as Kafka messages 
with null values
+(indicate tombstone for the key). Flink will guarantee the message ordering on 
the primary key by
+partition data on the values of the primary key columns, so the 
update/deletion messages on the same
+key will fall into the same partition.
+
+Dependencies
+------------
+
+In order to set up the upsert-kafka connector, the following table provide 
dependency information for
+both projects using a build automation tool (such as Maven or SBT) and SQL 
Client with SQL JAR bundles.
+
+{% assign connector = site.data.sql-connectors['upsert-kafka'] %}
+{% include sql-connector-download-table.html
+    connector=connector
+%}
+
+Full Example
+----------------
+
+The example below shows how to create and use an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE pageviews_per_region (
+  region STRING,
+  pv BIGINT,
+  uv BIGINT,
+  PRIMARY KEY (region) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'pageviews_per_region',
+  'properties.bootstrap.servers' = '...',
+  'key.format' = 'avro',
+  'value.format' = 'avro'
+);
+
+CREATE TABLE pageviews (
+  user_id BIGINT,
+  page_id BIGINT,
+  viewtime TIMESTAMP,
+  user_region STRING,
+  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'pageviews',
+  'properties.bootstrap.servers' = '...',
+  'format' = 'json'
+);
+
+-- calculate the pv, uv and insert into the upsert-kafka sink
+INSERT INTO pageviews_per_region
+SELECT
+  region,
+  COUNT(*),
+  COUNT(DISTINCT user_id)
+FROM pageviews
+GROUP BY region;
+
+{% endhighlight %}
+</div>
+</div>
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the DDL.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The Kafka topic name to read from and write to.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages. The key part
+      fields are specified by the PRIMARY KEY syntax. The supported formats 
include <code>'csv'</code>,
+      <code>'json'</code>, <code>'avro'</code>. Please refer to <a href="{% 
link dev/table/connectors/formats/index.md %}">Formats</a>
+      page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats include <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which fields should end up in the value as well. Available 
values:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema, even if they are part of the key.</li>
+         <li><code>EXCEPT_KEY</code>: the value part of the record contains 
all fields of the schema except the key fields.</li>
+       </ul>
+       </td>
+    </tr>
+    <tr>
+      <td><h5>sink.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism of the upsert-kafka sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Primary Key Constraints
+
+The Upsert Kafka always works in the upsert fashion and requires to define the 
primary key in the DDL.
+With the assumption that records with the same key should be ordered in the 
same partition, the
+primary key semantic on the changelog source means the materialized changelog 
is unique on the primary
+keys. The primary key definition will also control which fields should end up 
in Kafka’s key.
+
+### Consistency Guarantees
+
+By default, an Upsert Kafka sink ingests data with at-least-once guarantees 
into a Kafka topic if
+the query is executed with [checkpointing enabled]({% link 
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+
+This means, Flink may write duplicate records with the same key into the Kafka 
topic. But as the
+connector is working in the upsert mode, the last record on the same key will 
take effect when
+reading back as a source. Therefore, the upsert-kafka connector achieves 
idempotent writes just like
+the [HBase sink]({{ site.baseurl }}/dev/table/connectors/hbase.html).
+
+Data Type Mapping
+----------------
+
+Upsert Kafka stores message keys and values as bytes, so Upsert Kafka doesn't 
have schema or data types.
+The messages are deserialized and serialized by formats, e.g. csv, json, avro. 
Thus, the data type mapping
+is determined by specific formats. Please refer to [Formats]({% link 
dev/table/connectors/formats/index.md %})
+pages for more details.
+
+{% top %}

Reply via email to