PatrickRen commented on code in PR #2938:
URL: https://github.com/apache/flink-cdc/pull/2938#discussion_r1558983285


##########
docs/content/pipelines/kafka-pipeline(ZH).md:
##########
@@ -0,0 +1,194 @@
+# Kafka Pipeline 连接器
+
+Kafka Pipeline 连接器可以用作 Pipeline 的 *Data 
Sink*,将数据写入[Kafka](https://kafka.apache.org)。 本文档介绍如何设置 Kafka Pipeline 连接器。
+
+## 连接器的功能
+* 自动建表
+* 表结构变更同步
+* 数据实时同步
+
+如何创建 Pipeline
+----------------
+
+从 MySQL 读取数据同步到 Kafka 的 Pipeline 可以定义如下:
+
+```yaml
+source:
+  type: mysql
+  name: MySQL Source
+  hostname: 127.0.0.1
+  port: 3306
+  username: admin
+  password: pass
+  tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+  server-id: 5401-5404
+
+sink:
+  type: kafka
+  name: Kafka Sink
+  properties.bootstrap.servers: PLAINTEXT://localhost:62510
+
+pipeline:
+  name: MySQL to Kafka Pipeline
+  parallelism: 2
+```
+
+Pipeline 连接器配置项
+----------------
+<div class="highlight">
+<table class="colwidths-auto docutils">
+   <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>type</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>指定要使用的连接器, 这里需要设置成 <code>'kafka'</code>。 </td>
+    </tr>
+    <tr>
+      <td>name</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Sink 的名称。 </td>
+    </tr>
+    <tr>
+      <td>value.format</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>用于序列化 Kafka 消息的值部分数据的格式。可选的填写值包括 <a 
href="https://debezium.io/documentation/reference/stable/integrations/serdes.html";>debezium-json</a>
 和 <a href="https://github.com/alibaba/canal/wiki";>canal-json</a>, 默认值为 
`debezium-json`,并且目前不支持用户自定义输出格式。 </td>
+    </tr>
+    <tr>
+      <td>properties.bootstrap.servers</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>用于建立与 Kafka 集群初始连接的主机/端口对列表。</td>
+    </tr>
+    <tr>
+      <td>topic</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>如果配置了这个参数,所有的消息都会发送到这一个主题。</td>
+    </tr>
+    <tr>
+      <td>sink.add-tableId-to-header-enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>如果配置了这个参数,所有的消息都会带上键为 `namespace`, 'schemaName', 'tableName',值为事件 
TableId 里对应的 字符串的 header。</td>
+    </tr>
+    <tr>
+      <td>properties.*</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>将 Kafka 支持的参数传递给 pipeline,参考 <a 
href="https://kafka.apache.org/28/documentation.html#consumerconfigs";>Kafka 
consume options</a>。 </td>
+    </tr>
+    </tbody>
+</table>    
+</div>
+
+使用说明
+--------
+
+* 写入 Kafka 的 topic 默认会是上游表 `namespace.schemaName.tableName` 对应的字符串,可以通过 
pipeline 的 route 功能进行修改。
+* 如果配置了 `topic` 参数,所有的消息都会发送到这一个主题。
+* 写入 Kafka 的 topic 如果不存在,则会默认创建。
+
+数据类型映射
+----------------
+<div class="wy-table-responsive">
+<table class="colwidths-auto docutils">
+    <thead>
+      <tr>
+        <th class="text-left">CDC type</th>
+        <th class="text-left">Paimon type</th>

Review Comment:
   Should be JSON type?



##########
docs/content/pipelines/kafka-pipeline.md:
##########
@@ -0,0 +1,192 @@
+# Kafka Pipeline Connector
+
+The Kafka Pipeline connector can be used as the *Data Sink* of the pipeline, 
and write data to [Kafka](https://kafka.apache.org). This document describes 
how to set up the Kafka Pipeline connector.
+
+## What can the connector do?
+* Data synchronization
+
+How to create Pipeline
+----------------
+
+The pipeline for reading data from MySQL and sink to Kafka can be defined as 
follows:
+
+```yaml
+source:
+   type: mysql
+   name: MySQL Source
+   hostname: 127.0.0.1
+   port: 3306
+   username: admin
+   password: pass
+   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
+   server-id: 5401-5404
+
+sink:
+  type: kafka
+  name: Kafka Sink
+  properties.bootstrap.servers: PLAINTEXT://localhost:62510
+
+pipeline:
+  name: MySQL to Kafka Pipeline
+  parallelism: 2
+```
+
+Pipeline Connector Options
+----------------
+<div class="highlight">
+<table class="colwidths-auto docutils">
+   <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-left" style="width: 8%">Required</th>
+        <th class="text-left" style="width: 7%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>type</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, here should be 
<code>'kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td>name</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of the sink.</td>
+    </tr>
+    <tr>
+      <td>value.format</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to serialize the value part of Kafka messages. 
Available options are <a 
href="https://debezium.io/documentation/reference/stable/integrations/serdes.html";>debezium-json</a>
 and <a href="https://github.com/alibaba/canal/wiki";>canal-json</a>, default 
option is `debezium-json`, and do not support user-defined format now. </td>
+    </tr>
+    <tr>
+      <td>properties.bootstrap.servers</td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>A list of host/port pairs to use for establishing the initial 
connection to the Kafka cluster.</td>
+    </tr>
+    <tr>
+      <td>topic</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>If this parameter is configured, all events will be sent to this 
topic.</td>
+    </tr>
+    <tr>
+      <td>sink.add-tableId-to-header-enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Boolean</td>
+      <td>If this parameter is true, a header with key of 
'namespace','schemaName','tableName' will be added for each Kafka record. 
Default value is false.</td>
+    </tr>
+    <tr>
+      <td>table.properties.*</td>

Review Comment:
   ```suggestion
         <td>properties.*</td>
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kakfa/json/ChangeLogJsonFormatFactory.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kakfa.json;

Review Comment:
   ```suggestion
   package org.apache.flink.cdc.connectors.kafka.json;
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kakfa/sink/KafkaDataSinkOptions.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kakfa.sink;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.description.Description;
+import org.apache.flink.cdc.connectors.kakfa.json.JsonSerializationType;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+
+import static org.apache.flink.cdc.common.configuration.ConfigOptions.key;
+import static 
org.apache.flink.cdc.common.configuration.description.TextElement.text;
+
+/** Options for {@link KafkaDataSinkOptions}. */
+public class KafkaDataSinkOptions {
+
+    // Prefix for Kafka specific properties.
+    public static final String PROPERTIES_PREFIX = "properties.";
+
+    public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
+            key("sink.delivery-guarantee")
+                    .enumType(DeliveryGuarantee.class)
+                    .defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
+                    .withDescription("Optional delivery guarantee when 
committing.");
+
+    public static final ConfigOption<JsonSerializationType> VALUE_FORMAT =
+            key("value.format")
+                    .enumType(JsonSerializationType.class)
+                    .defaultValue(JsonSerializationType.DEBEZIUM_JSON)
+                    .withDescription(
+                            "Defines the format identifier for encoding value 
data, "
+                                    + "available options are `debezium-json` 
and `canal-json`, default option is `debezium-json`.");
+
+    public static final ConfigOption<String> SINK_PARTITIONER =

Review Comment:
   I'm concerning about the orderness of events here under different 
partitioners. 
   
   Currently before the sink, data change events are partitioned by table ID + 
primary keys, which preserves the orderness of events with the same table id 
and PK. If users choose `default` (StickyPartitioner provided by Kafka) or 
`round-robin`, events from the same subtask might be distributed to different 
partitions, which breaks the orderness. 
   
   What about we only provide fixed partitioner for now, and support more if we 
find that the feature is essential in the future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to