klion26 commented on a change in pull request #13410:
URL: https://github.com/apache/flink/pull/13410#discussion_r495574668



##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -23,90 +23,32 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器,用于向 Kafka topic 
中读取或者写入数据,可提供精确一次的处理语义。
+
 * This will be replaced by the TOC
 {:toc}
 
-此连接器提供了访问 [Apache Kafka](https://kafka.apache.org/) 事件流的服务。
-
-Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 
的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 
消费组的偏移量,而是在内部跟踪和检查偏移量。
-
-根据你的用例和环境选择相应的包(maven artifact id)和类名。对于大多数用户来说,使用 `FlinkKafkaConsumer`( 
`flink-connector-kafka` 的一部分)是比较合适的。
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">Maven 依赖</th>
-      <th class="text-left">自从哪个版本<br>开始支持</th>
-      <th class="text-left">消费者和<br>生产者的类名称</th>
-      <th class="text-left">Kafka 版本</th>
-      <th class="text-left">注意</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>flink-connector-kafka{{ site.scala_version_suffix }}</td>
-        <td>1.7.0</td>
-        <td>FlinkKafkaConsumer<br>
-        FlinkKafkaProducer</td>
-        <td>>= 1.0.0</td>
-        <td>
-        这个通用的 Kafka 连接器尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 
Flink 版本之间发生变化。从 Flink 1.9 版本开始,它使用 Kafka 2.2.0 client。当前 Kafka 客户端向后兼容 0.10.0 
或更高版本的 Kafka broker。
-        但是对于 Kafka 0.11.x 和 0.10.x 版本,我们建议你分别使用专用的 
flink-connector-kafka-0.11{{ site.scala_version_suffix }} 和 
flink-connector-kafka-0.10{{ site.scala_version_suffix }} 连接器。
-        </td>
-    </tr>
-  </tbody>
-</table>
-
-接着,在你的 maven 项目中导入连接器:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-请注意:目前流连接器还不是二进制分发的一部分。
-[在此处]({{ site.baseurl 
}}/zh/dev/project-configuration.html)可以了解到如何链接它们以实现在集群中执行。
-
-## 安装 Apache Kafka
-
-* 按照 [ Kafka 
快速入门](https://kafka.apache.org/documentation.html#quickstart)的说明下载代码并启动 Kafka 
服务器(每次启动应用程序之前都需要启动 Zookeeper 和 Kafka server)。
-* 如果 Kafka 和 Zookeeper 服务器运行在远端机器上,那么必须要将 `config/server.properties` 文件中的 
`advertised.host.name`属性设置为远端设备的 IP 地址。
-
-## Kafka 1.0.0+ 连接器
-
-从 Flink 1.7 开始,有一个新的通用 Kafka 连接器,它不跟踪特定的 Kafka 主版本。相反,它是在 Flink 发布时跟踪最新版本的 
Kafka。
-如果你的 Kafka broker 版本是 1.0.0 或 更新的版本,你应该使用这个 Kafka 连接器。
-如果你使用的是 Kafka 的旧版本( 0.11 或 0.10 ),那么你应该使用与 Kafka broker 版本相对应的连接器。
-
-### 兼容性
-
-通过 Kafka client API 和 broker 的兼容性保证,通用的 Kafka 连接器兼容较旧和较新的 Kafka broker。
-它兼容 Kafka broker 0.11.0 或者更高版本,具体兼容性取决于所使用的功能。有关 Kafka 兼容性的详细信息,请参考 [Kafka 
文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。
-
-### 将 Kafka Connector 从 0.11 迁移到通用版本
+## 依赖
 
-以便执行迁移,请参考 [升级 Jobs 和 Flink 版本指南]({{ site.baseurl }}/zh/ops/upgrading.html):
-* 在全程中使用 Flink 1.9 或更新版本。
-* 不要同时升级 Flink 和 Operator。
-* 确保你的 Job 中所使用的 Kafka Consumer 和 Kafka Producer 分配了唯一的标识符(uid)。
-* 使用 stop with savepoint 的特性来执行 savepoint(例如,使用 `stop --withSavepoint`)[CLI 
命令]({{ site.baseurl }}/zh/ops/cli.html)。
-
-### 用法
-
-要使用通用的 Kafka 连接器,请为它添加依赖关系:
+Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
+该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
+当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
+有关 Kafka 兼容性的更多细节,请参考  [Kafka 
官方文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。
 
+<div class="codetabs" markdown="1">
+<div data-lang="universal" markdown="1">
 {% highlight xml %}
 <dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connector-kafka{{ site.scala_version_suffix 
}}</artifactId>
+       <version>{{ site.version }}</version>
 </dependency>
-{% endhighlight %}
+{% endhighlight %} 
+</div>
+</div>
 
-然后,实例化 source( `FlinkKafkaConsumer`)和 sink( 
`FlinkKafkaProducer`)。除了从模块和类名中删除了特定的 Kafka 版本外,这个 API 向后兼容 Kafka 0.11 版本的 
connector。
+Flink 目前的流连接器还不是二进制发行版的一部分。
+[在此处]({{ site.baseurl 
}}/zh/dev/project-configuration.html)可以了解到如何链接它们以实现在集群中执行。

Review comment:
       1 链接建议使用 `{% link %}` 的方式,参考邮件列表 [1]
   2 `如何链接它们以实现在集群中执行` 这个还能否优化下呢,现在知道是什么意思,但是读起来还有点拗口
   
   [1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Reminder-Prefer-link-tag-in-documentation-td42362.html

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -431,39 +350,42 @@ stream.addSink(myProducer);
 {% highlight scala %}
 val stream: DataStream[String] = ...
 
+Properties properties = new Properties
+properties.setProperty("bootstrap.servers", "localhost:9092")
+
 val myProducer = new FlinkKafkaProducer[String](
-        "localhost:9092",         // broker 列表
         "my-topic",               // 目标 topic
-        new SimpleStringSchema)   // 序列化 schema
-
-// 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳;
-// 此方法不适用于早期版本的 Kafka
-myProducer.setWriteTimestampToKafka(true)
+        new SimpleStringSchema(), // 序列化 schema
+        properties,               // producer 配置
+        FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // 容错
 
 stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>
 
-上面的例子演示了创建 Flink Kafka Producer 来将流消息写入单个 Kafka 目标 topic 的基本用法。
-对于更高级的用法,这还有其他构造函数变体允许提供以下内容:
+## `SerializationSchema`
 
- * *提供自定义属性*:producer 允许为内部 `KafkaProducer` 提供自定义属性配置。有关如何配置 Kafka Producer 
的详细信息,请参阅  [Apache Kafka 文档](https://kafka.apache.org/documentation.html)。
- * *自定义分区器*:要将消息分配给特定的分区,可以向构造函数提供一个 `FlinkKafkaPartitioner` 
的实现。这个分区器将被流中的每条记录调用,以确定消息应该发送到目标 topic 的哪个具体分区里。有关详细信息,请参阅 [Kafka Producer 
分区方案](#kafka-producer-分区方案)。
- * *高级的序列化 schema*:与 consumer 类似,producer 还允许使用名为 `KeyedSerializationSchema` 
的高级序列化 schema,该 schema 允许单独序列化 key 和 value。它还允许覆盖目标 topic,以便 producer 
实例可以将数据发送到多个 topic。
+Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。
 
-### Kafka Producer 分区方案
+`KafkaSerializationSchema` 允许用户指定这样的 schema。它会为每个记录调用 `ProducerRecord<byte[], 
byte[]> serialize(T element, @Nullable Long timestamp)` 方法,产生一个写入到 Kafka 的 
`ProducerRecord`。
 
-默认情况下,如果没有为 Flink Kafka Producer 指定自定义分区程序,则 producer 将使用 
`FlinkFixedPartitioner` 为每个 Flink Kafka Producer 并行子任务映射到单个 Kafka 
分区(即,接收子任务接收到的所有消息都将位于同一个 Kafka 分区中)。
+用户可以对如何将数据写到Kafka进行细粒度的控制。你可以通过 producer record:

Review comment:
       ```suggestion
   用户可以对如何将数据写到 Kafka 进行细粒度的控制。你可以通过 producer record:
   ```

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -23,90 +23,32 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器,用于向 Kafka topic 
中读取或者写入数据,可提供精确一次的处理语义。
+
 * This will be replaced by the TOC
 {:toc}
 
-此连接器提供了访问 [Apache Kafka](https://kafka.apache.org/) 事件流的服务。
-
-Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 
的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 
消费组的偏移量,而是在内部跟踪和检查偏移量。
-
-根据你的用例和环境选择相应的包(maven artifact id)和类名。对于大多数用户来说,使用 `FlinkKafkaConsumer`( 
`flink-connector-kafka` 的一部分)是比较合适的。
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">Maven 依赖</th>
-      <th class="text-left">自从哪个版本<br>开始支持</th>
-      <th class="text-left">消费者和<br>生产者的类名称</th>
-      <th class="text-left">Kafka 版本</th>
-      <th class="text-left">注意</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>flink-connector-kafka{{ site.scala_version_suffix }}</td>
-        <td>1.7.0</td>
-        <td>FlinkKafkaConsumer<br>
-        FlinkKafkaProducer</td>
-        <td>>= 1.0.0</td>
-        <td>
-        这个通用的 Kafka 连接器尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 
Flink 版本之间发生变化。从 Flink 1.9 版本开始,它使用 Kafka 2.2.0 client。当前 Kafka 客户端向后兼容 0.10.0 
或更高版本的 Kafka broker。
-        但是对于 Kafka 0.11.x 和 0.10.x 版本,我们建议你分别使用专用的 
flink-connector-kafka-0.11{{ site.scala_version_suffix }} 和 
flink-connector-kafka-0.10{{ site.scala_version_suffix }} 连接器。
-        </td>
-    </tr>
-  </tbody>
-</table>
-
-接着,在你的 maven 项目中导入连接器:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-请注意:目前流连接器还不是二进制分发的一部分。
-[在此处]({{ site.baseurl 
}}/zh/dev/project-configuration.html)可以了解到如何链接它们以实现在集群中执行。
-
-## 安装 Apache Kafka
-
-* 按照 [ Kafka 
快速入门](https://kafka.apache.org/documentation.html#quickstart)的说明下载代码并启动 Kafka 
服务器(每次启动应用程序之前都需要启动 Zookeeper 和 Kafka server)。
-* 如果 Kafka 和 Zookeeper 服务器运行在远端机器上,那么必须要将 `config/server.properties` 文件中的 
`advertised.host.name`属性设置为远端设备的 IP 地址。
-
-## Kafka 1.0.0+ 连接器
-
-从 Flink 1.7 开始,有一个新的通用 Kafka 连接器,它不跟踪特定的 Kafka 主版本。相反,它是在 Flink 发布时跟踪最新版本的 
Kafka。
-如果你的 Kafka broker 版本是 1.0.0 或 更新的版本,你应该使用这个 Kafka 连接器。
-如果你使用的是 Kafka 的旧版本( 0.11 或 0.10 ),那么你应该使用与 Kafka broker 版本相对应的连接器。
-
-### 兼容性
-
-通过 Kafka client API 和 broker 的兼容性保证,通用的 Kafka 连接器兼容较旧和较新的 Kafka broker。
-它兼容 Kafka broker 0.11.0 或者更高版本,具体兼容性取决于所使用的功能。有关 Kafka 兼容性的详细信息,请参考 [Kafka 
文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。
-
-### 将 Kafka Connector 从 0.11 迁移到通用版本
+## 依赖
 
-以便执行迁移,请参考 [升级 Jobs 和 Flink 版本指南]({{ site.baseurl }}/zh/ops/upgrading.html):
-* 在全程中使用 Flink 1.9 或更新版本。
-* 不要同时升级 Flink 和 Operator。
-* 确保你的 Job 中所使用的 Kafka Consumer 和 Kafka Producer 分配了唯一的标识符(uid)。
-* 使用 stop with savepoint 的特性来执行 savepoint(例如,使用 `stop --withSavepoint`)[CLI 
命令]({{ site.baseurl }}/zh/ops/cli.html)。
-
-### 用法
-
-要使用通用的 Kafka 连接器,请为它添加依赖关系:
+Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
+该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。

Review comment:
       建议这些放到一行,否则渲染之后,两行中间会有空格

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -391,38 +304,44 @@ val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
 
-val myConsumer = new FlinkKafkaConsumer[String]("topic", new 
SimpleStringSchema(), properties)
-myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
-stream = env
-    .addSource(myConsumer)
-    .print()
+val myConsumer =
+    new FlinkKafkaConsumer("topic", new SimpleStringSchema(), properties);
+myConsumer.assignTimestampsAndWatermarks(
+    WatermarkStrategy.
+        .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
+
+val stream = env.addSource(myConsumer)
 {% endhighlight %}
 </div>
 </div>
 
-在内部,每个 Kafka 分区执行一个 assigner 实例。当指定了这样的 assigner 时,对于从 Kafka 读取的每条消息,调用 
`extractTimestamp(T element, long previousElementTimestamp)` 来为记录分配时间戳,并为 
`Watermark getCurrentWatermark()`(定期形式)或 `Watermark checkAndGetNextWatermark(T 
lastElement, long extractedTimestamp)`(打点形式)以确定是否应该发出新的 watermark 以及使用哪个时间戳。
-
-**请注意**:如果 watermark assigner 依赖于从 Kafka 读取的消息来上涨其 watermark 
(通常就是这种情况),那么所有主题和分区都需要有连续的消息流。否则,整个应用程序的 watermark 
将无法上涨,所有基于时间的算子(例如时间窗口或带有计时器的函数)也无法运行。单个的 Kafka 分区也会导致这种反应。这是一个已在计划中的 Flink 
改进,目的是为了防止这种情况发生(请见[FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer 
should consider idle 
partitions](https://issues.apache.org/jira/browse/FLINK-5479))。同时,可能的解决方法是将*心跳消息*发送到所有
 consumer 的分区里,从而上涨空闲分区的 watermark。
+**请注意**:如果 watermark assigner 依赖于从 Kafka 读取的消息来上涨其 watermark 
(通常就是这种情况),那么所有主题和分区都需要有连续的消息流。否则,整个应用程序的 watermark 
将无法上涨,所有基于时间的算子(例如时间窗口或带有计时器的函数)也无法运行。单个的 Kafka 分区也会导致这种反应。考虑设置适当的 [idelness 
timeouts]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html#dealing-with-idle-sources) 来缓解这个问题。

Review comment:
       链接换成 `{% link %}` 的形式

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -120,38 +62,30 @@ Flink 的 Kafka consumer 称为 `FlinkKafkaConsumer`。它提供对一个或多
   - "bootstrap.servers"(以逗号分隔的 Kafka broker 列表)
   - "group.id" 消费组 ID
 
-示例:
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 Properties properties = new Properties();
 properties.setProperty("bootstrap.servers", "localhost:9092");
 properties.setProperty("group.id", "test");
 DataStream<String> stream = env
-  .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), 
properties));
+    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), 
properties));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val properties = new Properties()
 properties.setProperty("bootstrap.servers", "localhost:9092")
 properties.setProperty("group.id", "test")
-stream = env
+val stream = env
     .addSource(new FlinkKafkaConsumer[String]("topic", new 
SimpleStringSchema(), properties))
-    .print()
 {% endhighlight %}
 </div>
 </div>
 
 ### `DeserializationSchema`
 
-Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 
对象。`DeserializationSchema` 允许用户指定这样的 schema,为每条 Kafka 消息调用 `T 
deserialize(byte[] message)` 方法,传递来自 Kafka 的值。
-
-从 `AbstractDeserializationSchema` 开始通常很有帮助,它负责将生成的 Java 或 Scala 类型描述为 Flink 
的类型系统。
-用户如果要自己去实现一个`DeserializationSchema`,需要自己去实现 `getProducedType(...)`方法。
-
-为了访问 Kafka 消息的 key、value 和元数据,`KafkaDeserializationSchema` 具有以下反序列化方法 `T 
deserialize(ConsumerRecord<byte[], byte[]> record)`。
+Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java 或者 Scala 
对象。`KafkaDeserializationSchema` 允许用户指定这样的 schema,为每条 Kafka 消息调用 `T 
deserialize(ConsumerRecord<byte[], byte[]> record)` 方法,传递来自 Kafka 的值。

Review comment:
       “为每条 Kafka 消息调用 `T deserialize(ConsumerRecord<byte[], byte[]> record)` 
方法,传递来自 Kafka 的值” 这里如果改成 “每条 Kafka 中的消息会调用 `T 
deserialize(ConsumerRecord<byte[], byte[]> record)` 反序列化” 这样会好一些吗?

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -23,90 +23,32 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器,用于向 Kafka topic 
中读取或者写入数据,可提供精确一次的处理语义。
+
 * This will be replaced by the TOC
 {:toc}
 
-此连接器提供了访问 [Apache Kafka](https://kafka.apache.org/) 事件流的服务。
-
-Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 
的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 
消费组的偏移量,而是在内部跟踪和检查偏移量。
-
-根据你的用例和环境选择相应的包(maven artifact id)和类名。对于大多数用户来说,使用 `FlinkKafkaConsumer`( 
`flink-connector-kafka` 的一部分)是比较合适的。
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left">Maven 依赖</th>
-      <th class="text-left">自从哪个版本<br>开始支持</th>
-      <th class="text-left">消费者和<br>生产者的类名称</th>
-      <th class="text-left">Kafka 版本</th>
-      <th class="text-left">注意</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-        <td>flink-connector-kafka{{ site.scala_version_suffix }}</td>
-        <td>1.7.0</td>
-        <td>FlinkKafkaConsumer<br>
-        FlinkKafkaProducer</td>
-        <td>>= 1.0.0</td>
-        <td>
-        这个通用的 Kafka 连接器尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 
Flink 版本之间发生变化。从 Flink 1.9 版本开始,它使用 Kafka 2.2.0 client。当前 Kafka 客户端向后兼容 0.10.0 
或更高版本的 Kafka broker。
-        但是对于 Kafka 0.11.x 和 0.10.x 版本,我们建议你分别使用专用的 
flink-connector-kafka-0.11{{ site.scala_version_suffix }} 和 
flink-connector-kafka-0.10{{ site.scala_version_suffix }} 连接器。
-        </td>
-    </tr>
-  </tbody>
-</table>
-
-接着,在你的 maven 项目中导入连接器:
-
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-connector-kafka{{ site.scala_version_suffix }}</artifactId>
-  <version>{{ site.version }}</version>
-</dependency>
-{% endhighlight %}
-
-请注意:目前流连接器还不是二进制分发的一部分。
-[在此处]({{ site.baseurl 
}}/zh/dev/project-configuration.html)可以了解到如何链接它们以实现在集群中执行。
-
-## 安装 Apache Kafka
-
-* 按照 [ Kafka 
快速入门](https://kafka.apache.org/documentation.html#quickstart)的说明下载代码并启动 Kafka 
服务器(每次启动应用程序之前都需要启动 Zookeeper 和 Kafka server)。
-* 如果 Kafka 和 Zookeeper 服务器运行在远端机器上,那么必须要将 `config/server.properties` 文件中的 
`advertised.host.name`属性设置为远端设备的 IP 地址。
-
-## Kafka 1.0.0+ 连接器
-
-从 Flink 1.7 开始,有一个新的通用 Kafka 连接器,它不跟踪特定的 Kafka 主版本。相反,它是在 Flink 发布时跟踪最新版本的 
Kafka。
-如果你的 Kafka broker 版本是 1.0.0 或 更新的版本,你应该使用这个 Kafka 连接器。
-如果你使用的是 Kafka 的旧版本( 0.11 或 0.10 ),那么你应该使用与 Kafka broker 版本相对应的连接器。
-
-### 兼容性
-
-通过 Kafka client API 和 broker 的兼容性保证,通用的 Kafka 连接器兼容较旧和较新的 Kafka broker。
-它兼容 Kafka broker 0.11.0 或者更高版本,具体兼容性取决于所使用的功能。有关 Kafka 兼容性的详细信息,请参考 [Kafka 
文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。
-
-### 将 Kafka Connector 从 0.11 迁移到通用版本
+## 依赖
 
-以便执行迁移,请参考 [升级 Jobs 和 Flink 版本指南]({{ site.baseurl }}/zh/ops/upgrading.html):
-* 在全程中使用 Flink 1.9 或更新版本。
-* 不要同时升级 Flink 和 Operator。
-* 确保你的 Job 中所使用的 Kafka Consumer 和 Kafka Producer 分配了唯一的标识符(uid)。
-* 使用 stop with savepoint 的特性来执行 savepoint(例如,使用 `stop --withSavepoint`)[CLI 
命令]({{ site.baseurl }}/zh/ops/cli.html)。
-
-### 用法
-
-要使用通用的 Kafka 连接器,请为它添加依赖关系:
+Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。
+该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。
+当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。

Review comment:
       这里应该是`向前兼容`?

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -302,8 +217,6 @@ Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准
 
 默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为 `flink.partition-discovery.interval-millis` 
设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。
 
-<span class="label label-danger">局限性</span> 当从 Flink 1.3.x 之前的 Flink 版本的 
savepoint 恢复 consumer 
时,分区发现无法在恢复运行时启用。如果启用了,那么还原将会失败并且出现异常。在这种情况下,为了使用分区发现,请首先在 Flink 1.3.x 中使用 
savepoint,然后再从 savepoint 中恢复。
-
 #### Topic 发现

Review comment:
       既然更像这个文档了,也按照 
[wiki](https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications)
 的方式,给所有标题添加一下 <a> 标签吧,可以仿照英文版的 url 进行添加
   

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -273,24 +205,7 @@ 
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
 
 因此,设置 checkpoint 的间隔定义了程序在发生故障时最多需要返回多少。
 
-要使用容错的 Kafka Consumer,需要在执行环境中启用拓扑的 checkpointing。
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-env.enableCheckpointing(5000); // 每隔 5000 毫秒 执行一次 checkpoint
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-env.enableCheckpointing(5000) // 每隔 5000 毫秒 执行一次 checkpoint
-{% endhighlight %}
-</div>
-</div>
-
-另请注意,只有当可用的 slots 足够时,Flink 才能重新启动。因此,如果拓扑由于丢失了 TaskManager 
而失败,那么之后必须要一直有足够可用的 solt。Flink on YARN 支持自动重启丢失的 YARN 容器。
+要使用容错的 Kafka Consumer,需要在 [执行环境]({{ site.baseurl 
}}/ops/config.html#execution-checkpointing-interval) 中启用拓扑的 checkpointing。

Review comment:
       1 `要使用容错的 Kafka Consumer` 这里能否优化下呢,读起来有点拗口
   2 链接建议使用 `{% link %}` 的形式

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -23,90 +23,32 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+Flink 提供了 [Apache Kafka](https://kafka.apache.org) 连接器,用于向 Kafka topic 
中读取或者写入数据,可提供精确一次的处理语义。

Review comment:
       `用于向 Kafka topic 中读取或者写入` 这里需要把这句话拆开成 `从xxx读取` `向xxx 写入`吗?现在读起来更像 
`向xxx读取`有一点奇怪

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -512,6 +434,17 @@ Flink 通过 Kafka 连接器提供了一流的支持,可以对 Kerberos 配置
 
 有关 Kerberos 安全性 Flink 配置的更多信息,请参见[这里]({{ site.baseurl 
}}/zh/ops/config.html)。你也可以在[这里]({{ site.baseurl 
}}/zh/ops/security-kerberos.html)进一步了解 Flink 如何在内部设置基于 kerberos 的安全性。
 
+## 升级到最近的连接器版本
+
+通用的升级步骤概述见 [升级 Jobs 和 Flink 版本指南]({{ site.baseurl }}/ops/upgrading.html)。对于 
Kafka,你还需要遵循这些步骤:

Review comment:
       链接替换成 `{% link %}` 

##########
File path: docs/dev/connectors/kafka.zh.md
##########
@@ -431,39 +350,42 @@ stream.addSink(myProducer);
 {% highlight scala %}
 val stream: DataStream[String] = ...
 
+Properties properties = new Properties
+properties.setProperty("bootstrap.servers", "localhost:9092")
+
 val myProducer = new FlinkKafkaProducer[String](
-        "localhost:9092",         // broker 列表
         "my-topic",               // 目标 topic
-        new SimpleStringSchema)   // 序列化 schema
-
-// 0.10+ 版本的 Kafka 允许在将记录写入 Kafka 时附加记录的事件时间戳;
-// 此方法不适用于早期版本的 Kafka
-myProducer.setWriteTimestampToKafka(true)
+        new SimpleStringSchema(), // 序列化 schema
+        properties,               // producer 配置
+        FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // 容错
 
 stream.addSink(myProducer)
 {% endhighlight %}
 </div>
 </div>
 
-上面的例子演示了创建 Flink Kafka Producer 来将流消息写入单个 Kafka 目标 topic 的基本用法。
-对于更高级的用法,这还有其他构造函数变体允许提供以下内容:
+## `SerializationSchema`
 
- * *提供自定义属性*:producer 允许为内部 `KafkaProducer` 提供自定义属性配置。有关如何配置 Kafka Producer 
的详细信息,请参阅  [Apache Kafka 文档](https://kafka.apache.org/documentation.html)。
- * *自定义分区器*:要将消息分配给特定的分区,可以向构造函数提供一个 `FlinkKafkaPartitioner` 
的实现。这个分区器将被流中的每条记录调用,以确定消息应该发送到目标 topic 的哪个具体分区里。有关详细信息,请参阅 [Kafka Producer 
分区方案](#kafka-producer-分区方案)。
- * *高级的序列化 schema*:与 consumer 类似,producer 还允许使用名为 `KeyedSerializationSchema` 
的高级序列化 schema,该 schema 允许单独序列化 key 和 value。它还允许覆盖目标 topic,以便 producer 
实例可以将数据发送到多个 topic。
+Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。
 
-### Kafka Producer 分区方案
+`KafkaSerializationSchema` 允许用户指定这样的 schema。它会为每个记录调用 `ProducerRecord<byte[], 
byte[]> serialize(T element, @Nullable Long timestamp)` 方法,产生一个写入到 Kafka 的 
`ProducerRecord`。
 
-默认情况下,如果没有为 Flink Kafka Producer 指定自定义分区程序,则 producer 将使用 
`FlinkFixedPartitioner` 为每个 Flink Kafka Producer 并行子任务映射到单个 Kafka 
分区(即,接收子任务接收到的所有消息都将位于同一个 Kafka 分区中)。
+用户可以对如何将数据写到Kafka进行细粒度的控制。你可以通过 producer record:
 
-可以通过扩展 `FlinkKafkaPartitioner` 类来实现自定义分区程序。所有 Kafka 版本的构造函数都允许在实例化 producer 
时提供自定义分区程序。
-注意:分区器实现必须是可序列化的,因为它们将在 Flink 节点之间传输。此外,请记住分区器中的任何状态都将在作业失败时丢失,因为分区器不是 
producer 的 checkpoint 状态的一部分。
-
-也可以完全避免使用分区器,并简单地让 Kafka 通过其附加 key 写入的消息进行分区(使用提供的序列化 schema 为每条记录确定分区)。
-为此,在实例化 producer 时提供 `null` 自定义分区程序,提供 `null` 作为自定义分区器是很重要的; 
如上所述,如果未指定自定义分区程序,则默认使用 `FlinkFixedPartitioner`。
+* 设置 header 值
+* 为每个 record 定义 key
+* 指定数据的自定义分区
 
 ### Kafka Producer 和容错
 
+启用 Flink 的checkpointing 后,`FlinkKafkaProducer` 可以提供精确一次的语义保证。

Review comment:
       ```suggestion
   启用 Flink 的 checkpointing 后,`FlinkKafkaProducer` 可以提供精确一次的语义保证。
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to