wuchong commented on a change in pull request #14017:
URL: https://github.com/apache/flink/pull/14017#discussion_r521826505



##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"

Review comment:
       "Upsert Kafka SQL Connector".
   
   There is no "Upsert Kafka" in Apache projects...

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>

Review comment:
       ```suggestion
     
<artifactId>flink-connector-kafka_{{site.scala_version_suffix}}</artifactId>
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.

Review comment:
       ```suggestion
         The supported formats include <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which field should end up in the value as well. Available 
value:

Review comment:
       ```suggestion
          <td>Controls which fields should end up in the value as well. 
Available values:
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which field should end up in the value as well. Available 
value:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema.</li>
+         <li><code>EXCEPT_KEY</code>: the value part of the record doesn't 
include primary key values.</li>
+       </ul>
+       </td>
+    </tr>
+    <tr>
+      <td><h5>sink.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism of the Upsert kafka sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Primary Key Constraints
+
+The Upsert Kafka always works in the upsert fashion and requires to define the 
primary key in the ddl. With the assumption that records with the same key are 
ordered,
+the primary key semantic on the changelog source means the materialized 
changelog is unique on the primary keys. The primary key definition will also 
control
+which fields should end up in Kafka’s key.
+
+### Changelog Source
+
+When the Upsert Kafka works as the source, it produces the changelog stream 
from the topic. Generally speaking, the Upsert Kafka requires the topic is <a 
href="https://kafka.apache.org/documentation/#compaction";><code>'compacted'</code></a>
+and all data with the same key are in the same partition. To keep the data 
integrity, the Upsert Kafka disallows to specify the reading position, which 
means Upsert Kafka always uses the `earliest-offset`.
+
+### Changelog Sink
+
+The Upsert Kafka has the ability to keep the records with the same key into 
the same partition by <code>'HASH'</code> partitioner. At the same time, the 
upsert-kafka will
+always try their best to not send the UPDATE-BEFORE message to the storage 
comparing to the kafka with CDC format.
+
+### Consistency Guarantees
+
+By default, an Upsert Kafka sink ingests data with at-least-once guarantees 
into a Kafka topic if the query is executed with [checkpointing enabled]({% 
link dev/stream/state/checkpointing.md 
%}#enabling-and-configuring-checkpointing).
+
+With the at-least-once guarantees, it may get duplicate records but keep the 
order of the records. When the Upsert Kafka materializes the changelog, the 
duplicate records will
+not destroy of the final results. Therefore, at-least-once guarantees is 
enough.

Review comment:
       ```suggestion
   This means, Flink may write duplicate records with the same key into the 
Kafka topic. But as the connector is working in the upsert mode, the last 
record on the same key will take effect when reading back as a source. 
Therefore, the upsert-kafka connector achieves idempotent writes just like the 
[HBase sink]({{ site.baseurl }}/dev/table/connectors/hbase.html).
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which field should end up in the value as well. Available 
value:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema.</li>
+         <li><code>EXCEPT_KEY</code>: the value part of the record doesn't 
include primary key values.</li>
+       </ul>
+       </td>
+    </tr>
+    <tr>
+      <td><h5>sink.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism of the Upsert kafka sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>

Review comment:
       ```suggestion
         <td>Defines the parallelism of the upsert-kafka sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.

Review comment:
       The Upsert Kafka connector allows for reading data from and writing data 
into Kafka topics in the upsert fashion.
   
   As a source, the upsert-kafka connector produces a changelog stream, where 
each data record represents an update or delete event. More precisely, the 
value in a data record is interpreted as an UPDATE of the last value for the 
same key, if any (if a corresponding key doesn’t exist yet, the update will be 
considered an INSERT). Using the table analogy, a data record in a changelog 
stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row 
with the same key is overwritten. Also, null values are interpreted in a 
special way: a record with a null value represents a “DELETE”.
   
   As a sink, the upsert-kafka connector can consume a changelog stream. It 
will write INSERT/UPDATE_AFTER data as normal Kafka messages value, and write 
DELETE data as Kafka messages with null values (indicate tombstone for the 
key). Flink will guarantee the message ordering on the primary key by partition 
data on the values of the primary key columns, so the update/deletion messages 
on the same key will fall into the same partition. 

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'

Review comment:
       We don't need this as this is the default value. 

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.

Review comment:
       ```suggestion
         The supported formats include <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)

Review comment:
       Please add a more concreate example for how to insert an aggregation 
result into this upsert kafka table, and how to read from the upsert kafka 
table.
   
   You can learn examples in 
[FLIP-149](https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector#FLIP149:IntroducetheupsertkafkaConnector-Examples)
 and [this doc](https://docs.ksqldb.io/en/latest/tutorials/examples/). 

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which field should end up in the value as well. Available 
value:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema.</li>
+         <li><code>EXCEPT_KEY</code>: the value part of the record doesn't 
include primary key values.</li>

Review comment:
       ```suggestion
            <li><code>EXCEPT_KEY</code>: the value part of the record contains 
all fields of the schema except the key fields.</li>
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>

Review comment:
       ```suggestion
         <td>The Kafka topic name to read from and write to.</td>
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.

Review comment:
       ```suggestion
         <td>The format used to deserialize and serialize the key part of the 
Kafka messages. The key part fields are specified by the PRIMARY KEY syntax.
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which field should end up in the value as well. Available 
value:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema.</li>

Review comment:
       ```suggestion
            <li><code>ALL</code>: the value part of the record contains all 
fields of the schema, even if they are part of the key.</li>
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}

Review comment:
       
   ```
   In order to setup the upsert-kafka connector, the following table provide 
dependency information for both projects using a build automation tool (such as 
Maven or SBT) and SQL Client with SQL JAR bundles.
   
   | Kafka Version       | Maven dependency                                     
     | SQL Client JAR         |
   | :------------------ | 
:-------------------------------------------------------- | 
:----------------------|
   | universal               | 
`flink-connector-kafka{{site.scala_version_suffix}}`      | {% if 
site.is_stable %} 
[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar)
 {% else %} Only available for [stable releases]({{ site.stable_baseurl 
}}/dev/table/connectors/upsert-kafka.html) {% endif %}|
   ```
   
   1. add SQL CLI JAR
   2. use variables instead of hard code the version.
   
   

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which field should end up in the value as well. Available 
value:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema.</li>
+         <li><code>EXCEPT_KEY</code>: the value part of the record doesn't 
include primary key values.</li>
+       </ul>
+       </td>
+    </tr>
+    <tr>
+      <td><h5>sink.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism of the Upsert kafka sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Primary Key Constraints
+
+The Upsert Kafka always works in the upsert fashion and requires to define the 
primary key in the ddl. With the assumption that records with the same key are 
ordered,

Review comment:
       ```suggestion
   The Upsert Kafka always works in the upsert fashion and requires to define 
the primary key in the DDL. With the assumption that records with the same key 
should be ordered in the same partition,
   ```

##########
File path: docs/dev/table/connectors/upsert-kafka.md
##########
@@ -0,0 +1,183 @@
+---
+title: "Apache Upsert Kafka SQL Connector"
+nav-title: Upsert Kafka
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Upsert Kafka connector allows for reading data from and writing data into 
Kafka topics in the upsert fashion.
+
+More precisely, the value part of the record is interpreted as an "UPDATE" of 
the last value for the same key.
+If the record doesn't exist yet, it will treat the record as an INSERT 
message. With the tombstone semantic in kafka,
+it will interpret the record whose value part is null as a DELETE message.
+
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kafka_2.11</artifactId>
+  <version>1.12-SNAPSHOT</version>
+</dependency>
+{% endhighlight %}
+
+How to create an Upsert Kafka table
+----------------
+
+The example below shows how to create an Upsert Kafka table:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE upsertKafkaTable (
+  user_id BIGINT,
+  item_id BIGINT,
+  category_id BIGINT,
+  behaviour STRING,
+  ts TIMESTAMP(3),
+  PRIMARY KEY (user_id) NOT ENFORCED
+) WITH (
+  'connector' = 'upsert-kafka',
+  'topic' = 'user_behaviour',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'key.format' = 'json',
+  'value.format' = 'json',
+  'value.fields-include' = 'ALL'
+)
+{% endhighlight %}
+</div>
+</div>
+
+<span class="label label-danger">Attention</span> Make sure to define the 
primary key in the ddl.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify which connector to use, for the Upsert Kafka use: 
<code>'upsert-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>topic</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Topic name to read from or write to when the table is used as source 
or sink.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.bootstrap.servers</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Comma separated list of Kafka brokers.</td>
+    </tr>
+    <tr>
+      <td><h5>key.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the key part of the 
Kafka messages that is derived from the primary key.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>value.format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize the value part of the 
Kafka messages.
+      The supported formats are <code>'csv'</code>, <code>'json'</code>, 
<code>'avro'</code>.
+      Please refer to <a href="{% link dev/table/connectors/formats/index.md 
%}">Formats</a> page for more details and more format options.
+      </td>
+    </tr>
+    <tr>
+       <td><h5>value.fields-include</h5></td>
+       <td>required</td>
+       <td style="word-wrap: break-word;"><code>'ALL'</code></td>
+       <td>String</td>
+       <td>Controls which field should end up in the value as well. Available 
value:
+       <ul>
+         <li><code>ALL</code>: the value part of the record contains all 
fields of the schema.</li>
+         <li><code>EXCEPT_KEY</code>: the value part of the record doesn't 
include primary key values.</li>
+       </ul>
+       </td>
+    </tr>
+    <tr>
+      <td><h5>sink.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism of the Upsert kafka sink operator. By 
default, the parallelism is determined by the framework using the same 
parallelism of the upstream chained operator.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+----------------
+
+### Primary Key Constraints
+
+The Upsert Kafka always works in the upsert fashion and requires to define the 
primary key in the ddl. With the assumption that records with the same key are 
ordered,
+the primary key semantic on the changelog source means the materialized 
changelog is unique on the primary keys. The primary key definition will also 
control
+which fields should end up in Kafka’s key.
+
+### Changelog Source
+
+When the Upsert Kafka works as the source, it produces the changelog stream 
from the topic. Generally speaking, the Upsert Kafka requires the topic is <a 
href="https://kafka.apache.org/documentation/#compaction";><code>'compacted'</code></a>
+and all data with the same key are in the same partition. To keep the data 
integrity, the Upsert Kafka disallows to specify the reading position, which 
means Upsert Kafka always uses the `earliest-offset`.
+
+### Changelog Sink
+
+The Upsert Kafka has the ability to keep the records with the same key into 
the same partition by <code>'HASH'</code> partitioner. At the same time, the 
upsert-kafka will
+always try their best to not send the UPDATE-BEFORE message to the storage 
comparing to the kafka with CDC format.

Review comment:
       We have moved these part to the top. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to