This is an automated email from the ASF dual-hosted git repository.

HTHou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-docs.git


The following commit(s) were added to refs/heads/main by this push:
     new d8a74774 docs: update Kafka programming guide (#1127)
d8a74774 is described below

commit d8a74774c32da70fae560c1515fa868f8b2f0800
Author: leto-b <[email protected]>
AuthorDate: Fri May 22 09:49:49 2026 +0800

    docs: update Kafka programming guide (#1127)
    
    * docs: update kafka programming guide
    
    * docs: refine kafka user guide wording
---
 .../Ecosystem-Integration/Programming-Kafka.md     | 241 ++++++++++++++-------
 .../Ecosystem-Integration/Programming-Kafka.md     | 241 ++++++++++++++-------
 .../Ecosystem-Integration/Programming-Kafka.md     | 241 ++++++++++++++-------
 .../Ecosystem-Integration/Programming-Kafka.md     | 241 ++++++++++++++-------
 4 files changed, 628 insertions(+), 336 deletions(-)

diff --git 
a/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md 
b/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md
index aab8d3d2..a0954508 100644
--- a/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md
+++ b/src/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md
@@ -21,98 +21,171 @@
 
 # Kafka
 
-[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event 
streaming platform used by thousands of companies for high-performance data 
pipelines, streaming analytics, data integration, and mission-critical 
applications.
+[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event 
streaming platform used by thousands of companies for high-performance data 
pipelines, streaming analytics, data integration, and mission-critical 
applications. IoTDB can subscribe to Kafka data through a Kafka Consumer and 
write the data to IoTDB by using the Session API.
 
-## 1. Coding Example
+This document introduces a simple data ingestion flow: an application writes 
messages to a Kafka topic, the Kafka Consumer consumes the messages and parses 
them into IoTDB time series data, and then writes the data to IoTDB.
 
-### 1.1 kafka Producer Producing Data Java Code Example
+## 1. Environment Preparation
+
+Before you start, make sure that the following environment is available:
+
+- JDK 8 or later
+- Maven 3.6 or later
+- Apache Kafka. For installation and startup, refer to the [Kafka official 
documentation](https://kafka.apache.org/documentation/)
+- An IoTDB service is running
+
+The default addresses used in the following examples are:
+
+| Service | Address |
+| --- | --- |
+| Kafka | `127.0.0.1:9092` |
+| IoTDB | `127.0.0.1:6667` |
+| IoTDB username | `root` |
+| IoTDB password | `root` |
+
+## 2. Add Dependencies
+
+Add the Kafka and IoTDB Session dependencies to your Maven `pom.xml`. It is 
recommended that the IoTDB dependency version matches your deployed IoTDB 
version.
+
+```xml
+<dependencies>
+    <dependency>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-session</artifactId>
+        <version>2.0.4</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>3.7.0</version>
+    </dependency>
+</dependencies>
+```
+
+For the complete example project, see 
[iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka).
+
+## 3. Kafka Message Format
+
+The sample program uses a string format to transfer one IoTDB data record:
+
+```text
+device,timestamp,measurements,types,values
+```
+
+The fields are described as follows:
+
+| Field | Description | Example |
+| --- | --- | --- |
+| `device` | IoTDB device path | `root.kafka.d0` |
+| `timestamp` | Timestamp in milliseconds | `1716180000000` |
+| `measurements` | Measurement names, separated by `:` when there are multiple 
values | `temperature:status` |
+| `types` | Data types, separated by `:` when there are multiple values | 
`DOUBLE:BOOLEAN` |
+| `values` | Data values, separated by `:` when there are multiple values | 
`36.5:true` |
+
+Single-measurement example:
+
+```text
+root.kafka.d0,1716180000000,temperature,DOUBLE,36.5
+```
+
+Multiple-measurement example:
+
+```text
+root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true
+```
+
+## 4. Produce Data to Kafka
+
+The following code shows the key logic for writing one IoTDB data record to 
the `Kafka-Test` topic:
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.serializer", StringSerializer.class);
-    props.put("value.serializer", StringSerializer.class);
-    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-    producer.send(
-        new ProducerRecord<>(
-            "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + 
",value,INT32,100"));
-    producer.close();
+String value = "root.kafka.d0,"
+    + System.currentTimeMillis()
+    + ",temperature:status,DOUBLE:BOOLEAN,36.5:true";
+
+producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value));
 ```
 
-### 1.2 kafka Consumer Receiving Data Java Code Example
+## 5. Consume Kafka Data and Write to IoTDB
+
+After the Kafka Consumer reads a message from the topic, it parses the device, 
timestamp, measurements, types, and values, and writes the data to IoTDB 
through SessionPool.
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.deserializer", StringDeserializer.class);
-    props.put("value.deserializer", StringDeserializer.class);
-    props.put("auto.offset.reset", "earliest");
-    props.put("group.id", "Kafka-Test");
-    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
-    kafkaConsumer.subscribe(Collections.singleton("Kafka-Test"));
-    ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofSeconds(1));
- ```
-
-### 1.3 Example of Java Code Stored in IoTDB Server
+String[] fields = record.value().split(",");
+String device = fields[0];
+long time = Long.parseLong(fields[1]);
+List<String> measurements = Arrays.asList(fields[2].split(":"));
+
+String[] typeNames = fields[3].split(":");
+String[] valueTexts = fields[4].split(":");
+
+List<TSDataType> types = new ArrayList<>();
+List<Object> values = new ArrayList<>();
+
+for (int i = 0; i < typeNames.length; i++) {
+  TSDataType type = TSDataType.valueOf(typeNames[i]);
+  types.add(type);
+
+  switch (type) {
+    case INT32:
+      values.add(Integer.parseInt(valueTexts[i]));
+      break;
+    case INT64:
+      values.add(Long.parseLong(valueTexts[i]));
+      break;
+    case FLOAT:
+      values.add(Float.parseFloat(valueTexts[i]));
+      break;
+    case DOUBLE:
+      values.add(Double.parseDouble(valueTexts[i]));
+      break;
+    case BOOLEAN:
+      values.add(Boolean.parseBoolean(valueTexts[i]));
+      break;
+    case TEXT:
+      values.add(valueTexts[i]);
+      break;
+    default:
+      throw new IllegalArgumentException("Unsupported data type: " + type);
+  }
+}
+
+pool.insertRecord(device, time, measurements, types, values);
+```
+
+The IoTDB `SessionPool` can be created as follows:
 
 ```java
-    SessionPool pool =
-        new SessionPool.Builder()
-            .host("127.0.0.1")
-            .port(6667)
-            .user("root")
-            .password("root")
-            .maxSize(3)
-            .build();
-    List<String> datas = new ArrayList<>(records.count());
-    for (ConsumerRecord<String, String> record : records) {
-      datas.add(record.value());
-    }
-    int size = datas.size();
-    List<String> deviceIds = new ArrayList<>(size);
-    List<Long> times = new ArrayList<>(size);
-    List<List<String>> measurementsList = new ArrayList<>(size);
-    List<List<TSDataType>> typesList = new ArrayList<>(size);
-    List<List<Object>> valuesList = new ArrayList<>(size);
-    for (String data : datas) {
-      String[] dataArray = data.split(",");
-      String device = dataArray[0];
-      long time = Long.parseLong(dataArray[1]);
-      List<String> measurements = Arrays.asList(dataArray[2].split(":"));
-      List<TSDataType> types = new ArrayList<>();
-      for (String type : dataArray[3].split(":")) {
-        types.add(TSDataType.valueOf(type));
-      }
-      List<Object> values = new ArrayList<>();
-      String[] valuesStr = dataArray[4].split(":");
-      for (int i = 0; i < valuesStr.length; i++) {
-        switch (types.get(i)) {
-          case INT64:
-            values.add(Long.parseLong(valuesStr[i]));
-            break;
-          case DOUBLE:
-            values.add(Double.parseDouble(valuesStr[i]));
-            break;
-          case INT32:
-            values.add(Integer.parseInt(valuesStr[i]));
-            break;
-          case TEXT:
-            values.add(valuesStr[i]);
-            break;
-          case FLOAT:
-            values.add(Float.parseFloat(valuesStr[i]));
-            break;
-          case BOOLEAN:
-            values.add(Boolean.parseBoolean(valuesStr[i]));
-            break;
-        }
-      }
-      deviceIds.add(device);
-      times.add(time);
-      measurementsList.add(measurements);
-      typesList.add(types);
-      valuesList.add(values);
-    }
-    pool.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
- ```
+SessionPool pool = new SessionPool.Builder()
+    .host("127.0.0.1")
+    .port(6667)
+    .user("root")
+    .password("root")
+    .maxSize(3)
+    .build();
+```
+
+## 6. Query the Result
+
+Connect to the IoTDB CLI:
+
+```bash
+./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root
+```
+
+Run the query:
+
+```sql
+SELECT * FROM root.kafka.**;
+```
+
+The query result shows the time series data written by the Kafka Consumer, for 
example:
+
+```text
++-----------------------------+-------------------------+--------------------+
+|                         Time|root.kafka.d0.temperature|root.kafka.d0.status|
++-----------------------------+-------------------------+--------------------+
+|2024-05-20T10:00:00.000+08:00|                     36.5|                true|
++-----------------------------+-------------------------+--------------------+
+```
 
diff --git a/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md 
b/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md
index aab8d3d2..a0954508 100644
--- a/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md
+++ b/src/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md
@@ -21,98 +21,171 @@
 
 # Kafka
 
-[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event 
streaming platform used by thousands of companies for high-performance data 
pipelines, streaming analytics, data integration, and mission-critical 
applications.
+[Apache Kafka](https://kafka.apache.org/) is an open-source distributed event 
streaming platform used by thousands of companies for high-performance data 
pipelines, streaming analytics, data integration, and mission-critical 
applications. IoTDB can subscribe to Kafka data through a Kafka Consumer and 
write the data to IoTDB by using the Session API.
 
-## 1. Coding Example
+This document introduces a simple data ingestion flow: an application writes 
messages to a Kafka topic, the Kafka Consumer consumes the messages and parses 
them into IoTDB time series data, and then writes the data to IoTDB.
 
-### 1.1 kafka Producer Producing Data Java Code Example
+## 1. Environment Preparation
+
+Before you start, make sure that the following environment is available:
+
+- JDK 8 or later
+- Maven 3.6 or later
+- Apache Kafka. For installation and startup, refer to the [Kafka official 
documentation](https://kafka.apache.org/documentation/)
+- An IoTDB service is running
+
+The default addresses used in the following examples are:
+
+| Service | Address |
+| --- | --- |
+| Kafka | `127.0.0.1:9092` |
+| IoTDB | `127.0.0.1:6667` |
+| IoTDB username | `root` |
+| IoTDB password | `root` |
+
+## 2. Add Dependencies
+
+Add the Kafka and IoTDB Session dependencies to your Maven `pom.xml`. It is 
recommended that the IoTDB dependency version matches your deployed IoTDB 
version.
+
+```xml
+<dependencies>
+    <dependency>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-session</artifactId>
+        <version>2.0.4</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>3.7.0</version>
+    </dependency>
+</dependencies>
+```
+
+For the complete example project, see 
[iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka).
+
+## 3. Kafka Message Format
+
+The sample program uses a string format to transfer one IoTDB data record:
+
+```text
+device,timestamp,measurements,types,values
+```
+
+The fields are described as follows:
+
+| Field | Description | Example |
+| --- | --- | --- |
+| `device` | IoTDB device path | `root.kafka.d0` |
+| `timestamp` | Timestamp in milliseconds | `1716180000000` |
+| `measurements` | Measurement names, separated by `:` when there are multiple 
values | `temperature:status` |
+| `types` | Data types, separated by `:` when there are multiple values | 
`DOUBLE:BOOLEAN` |
+| `values` | Data values, separated by `:` when there are multiple values | 
`36.5:true` |
+
+Single-measurement example:
+
+```text
+root.kafka.d0,1716180000000,temperature,DOUBLE,36.5
+```
+
+Multiple-measurement example:
+
+```text
+root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true
+```
+
+## 4. Produce Data to Kafka
+
+The following code shows the key logic for writing one IoTDB data record to 
the `Kafka-Test` topic:
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.serializer", StringSerializer.class);
-    props.put("value.serializer", StringSerializer.class);
-    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-    producer.send(
-        new ProducerRecord<>(
-            "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + 
",value,INT32,100"));
-    producer.close();
+String value = "root.kafka.d0,"
+    + System.currentTimeMillis()
+    + ",temperature:status,DOUBLE:BOOLEAN,36.5:true";
+
+producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value));
 ```
 
-### 1.2 kafka Consumer Receiving Data Java Code Example
+## 5. Consume Kafka Data and Write to IoTDB
+
+After the Kafka Consumer reads a message from the topic, it parses the device, 
timestamp, measurements, types, and values, and writes the data to IoTDB 
through SessionPool.
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.deserializer", StringDeserializer.class);
-    props.put("value.deserializer", StringDeserializer.class);
-    props.put("auto.offset.reset", "earliest");
-    props.put("group.id", "Kafka-Test");
-    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
-    kafkaConsumer.subscribe(Collections.singleton("Kafka-Test"));
-    ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofSeconds(1));
- ```
-
-### 1.3 Example of Java Code Stored in IoTDB Server
+String[] fields = record.value().split(",");
+String device = fields[0];
+long time = Long.parseLong(fields[1]);
+List<String> measurements = Arrays.asList(fields[2].split(":"));
+
+String[] typeNames = fields[3].split(":");
+String[] valueTexts = fields[4].split(":");
+
+List<TSDataType> types = new ArrayList<>();
+List<Object> values = new ArrayList<>();
+
+for (int i = 0; i < typeNames.length; i++) {
+  TSDataType type = TSDataType.valueOf(typeNames[i]);
+  types.add(type);
+
+  switch (type) {
+    case INT32:
+      values.add(Integer.parseInt(valueTexts[i]));
+      break;
+    case INT64:
+      values.add(Long.parseLong(valueTexts[i]));
+      break;
+    case FLOAT:
+      values.add(Float.parseFloat(valueTexts[i]));
+      break;
+    case DOUBLE:
+      values.add(Double.parseDouble(valueTexts[i]));
+      break;
+    case BOOLEAN:
+      values.add(Boolean.parseBoolean(valueTexts[i]));
+      break;
+    case TEXT:
+      values.add(valueTexts[i]);
+      break;
+    default:
+      throw new IllegalArgumentException("Unsupported data type: " + type);
+  }
+}
+
+pool.insertRecord(device, time, measurements, types, values);
+```
+
+The IoTDB `SessionPool` can be created as follows:
 
 ```java
-    SessionPool pool =
-        new SessionPool.Builder()
-            .host("127.0.0.1")
-            .port(6667)
-            .user("root")
-            .password("root")
-            .maxSize(3)
-            .build();
-    List<String> datas = new ArrayList<>(records.count());
-    for (ConsumerRecord<String, String> record : records) {
-      datas.add(record.value());
-    }
-    int size = datas.size();
-    List<String> deviceIds = new ArrayList<>(size);
-    List<Long> times = new ArrayList<>(size);
-    List<List<String>> measurementsList = new ArrayList<>(size);
-    List<List<TSDataType>> typesList = new ArrayList<>(size);
-    List<List<Object>> valuesList = new ArrayList<>(size);
-    for (String data : datas) {
-      String[] dataArray = data.split(",");
-      String device = dataArray[0];
-      long time = Long.parseLong(dataArray[1]);
-      List<String> measurements = Arrays.asList(dataArray[2].split(":"));
-      List<TSDataType> types = new ArrayList<>();
-      for (String type : dataArray[3].split(":")) {
-        types.add(TSDataType.valueOf(type));
-      }
-      List<Object> values = new ArrayList<>();
-      String[] valuesStr = dataArray[4].split(":");
-      for (int i = 0; i < valuesStr.length; i++) {
-        switch (types.get(i)) {
-          case INT64:
-            values.add(Long.parseLong(valuesStr[i]));
-            break;
-          case DOUBLE:
-            values.add(Double.parseDouble(valuesStr[i]));
-            break;
-          case INT32:
-            values.add(Integer.parseInt(valuesStr[i]));
-            break;
-          case TEXT:
-            values.add(valuesStr[i]);
-            break;
-          case FLOAT:
-            values.add(Float.parseFloat(valuesStr[i]));
-            break;
-          case BOOLEAN:
-            values.add(Boolean.parseBoolean(valuesStr[i]));
-            break;
-        }
-      }
-      deviceIds.add(device);
-      times.add(time);
-      measurementsList.add(measurements);
-      typesList.add(types);
-      valuesList.add(values);
-    }
-    pool.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
- ```
+SessionPool pool = new SessionPool.Builder()
+    .host("127.0.0.1")
+    .port(6667)
+    .user("root")
+    .password("root")
+    .maxSize(3)
+    .build();
+```
+
+## 6. Query the Result
+
+Connect to the IoTDB CLI:
+
+```bash
+./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root
+```
+
+Run the query:
+
+```sql
+SELECT * FROM root.kafka.**;
+```
+
+The query result shows the time series data written by the Kafka Consumer, for 
example:
+
+```text
++-----------------------------+-------------------------+--------------------+
+|                         Time|root.kafka.d0.temperature|root.kafka.d0.status|
++-----------------------------+-------------------------+--------------------+
+|2024-05-20T10:00:00.000+08:00|                     36.5|                true|
++-----------------------------+-------------------------+--------------------+
+```
 
diff --git 
a/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md 
b/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md
index 214cc2c7..9b81a9d2 100644
--- a/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md
+++ b/src/zh/UserGuide/Master/Tree/Ecosystem-Integration/Programming-Kafka.md
@@ -21,98 +21,171 @@
 
 # Kafka
 
-[Apache Kafka](https://kafka.apache.org/)  
是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
+[Apache Kafka](https://kafka.apache.org/) 
是一个开源的分布式事件流平台,常用于构建高性能数据管道、流式分析和数据集成系统。IoTDB 可以通过 Kafka Consumer 订阅 Kafka 
中的数据,并使用 Session API 将数据写入 IoTDB。
 
-## 1. 示例代码
+本文介绍一个简单的数据写入流程:应用程序向 Kafka Topic 写入消息,Kafka Consumer 消费消息并解析为 IoTDB 时序数据,最后写入 
IoTDB。
 
-### 1.1 kafka 生产者生产数据 Java 代码示例
+## 1. 环境准备
+
+使用前请确保已准备以下环境:
+
+- JDK 8 或以上版本
+- Maven 3.6 或以上版本
+- Apache Kafka,安装与启动方式请参考 [Kafka 官方文档](https://kafka.apache.org/documentation/)
+- IoTDB 服务已启动
+
+以下示例中使用的默认地址如下:
+
+| 服务 | 地址 |
+| --- | --- |
+| Kafka | `127.0.0.1:9092` |
+| IoTDB | `127.0.0.1:6667` |
+| IoTDB 用户名 | `root` |
+| IoTDB 密码 | `root` |
+
+## 2. 添加项目依赖
+
+在 Maven 项目的 `pom.xml` 中添加 Kafka 与 IoTDB Session 相关依赖。IoTDB 依赖版本建议与实际部署的 IoTDB 
版本保持一致。
+
+```xml
+<dependencies>
+    <dependency>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-session</artifactId>
+        <version>2.0.4</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>3.7.0</version>
+    </dependency>
+</dependencies>
+```
+
+完整示例工程可参考:[iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka)。
+
+## 3. Kafka 消息格式
+
+示例程序使用字符串格式传输一条 IoTDB 数据记录:
+
+```text
+device,timestamp,measurements,types,values
+```
+
+字段说明如下:
+
+| 字段 | 说明 | 示例 |
+| --- | --- | --- |
+| `device` | IoTDB 设备路径 | `root.kafka.d0` |
+| `timestamp` | 时间戳,单位为毫秒 | `1716180000000` |
+| `measurements` | 测点名称,多个测点使用 `:` 分隔 | `temperature:status` |
+| `types` | 数据类型,多个类型使用 `:` 分隔 | `DOUBLE:BOOLEAN` |
+| `values` | 数据值,多个值使用 `:` 分隔 | `36.5:true` |
+
+单测点消息示例:
+
+```text
+root.kafka.d0,1716180000000,temperature,DOUBLE,36.5
+```
+
+多测点消息示例:
+
+```text
+root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true
+```
+
+## 4. 生产数据到 Kafka
+
+以下代码展示向 `Kafka-Test` Topic 写入一条 IoTDB 数据记录的关键逻辑:
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.serializer", StringSerializer.class);
-    props.put("value.serializer", StringSerializer.class);
-    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-    producer.send(
-        new ProducerRecord<>(
-            "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + 
",value,INT32,100"));
-    producer.close();
+String value = "root.kafka.d0,"
+    + System.currentTimeMillis()
+    + ",temperature:status,DOUBLE:BOOLEAN,36.5:true";
+
+producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value));
 ```
 
-### 1.2 kafka 消费者接收数据 Java 代码示例
+## 5. 消费 Kafka 数据并写入 IoTDB
+
+Kafka Consumer 从 Topic 中读取消息后,解析设备、时间戳、测点、类型和值,并调用 IoTDB SessionPool 写入数据。
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.deserializer", StringDeserializer.class);
-    props.put("value.deserializer", StringDeserializer.class);
-    props.put("auto.offset.reset", "earliest");
-    props.put("group.id", "Kafka-Test");
-    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
-    kafkaConsumer.subscribe(Collections.singleton("Kafka-Test"));
-    ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofSeconds(1));
- ```
-
-### 1.3 存入 IoTDB 服务器的 Java 代码示例
+String[] fields = record.value().split(",");
+String device = fields[0];
+long time = Long.parseLong(fields[1]);
+List<String> measurements = Arrays.asList(fields[2].split(":"));
+
+String[] typeNames = fields[3].split(":");
+String[] valueTexts = fields[4].split(":");
+
+List<TSDataType> types = new ArrayList<>();
+List<Object> values = new ArrayList<>();
+
+for (int i = 0; i < typeNames.length; i++) {
+  TSDataType type = TSDataType.valueOf(typeNames[i]);
+  types.add(type);
+
+  switch (type) {
+    case INT32:
+      values.add(Integer.parseInt(valueTexts[i]));
+      break;
+    case INT64:
+      values.add(Long.parseLong(valueTexts[i]));
+      break;
+    case FLOAT:
+      values.add(Float.parseFloat(valueTexts[i]));
+      break;
+    case DOUBLE:
+      values.add(Double.parseDouble(valueTexts[i]));
+      break;
+    case BOOLEAN:
+      values.add(Boolean.parseBoolean(valueTexts[i]));
+      break;
+    case TEXT:
+      values.add(valueTexts[i]);
+      break;
+    default:
+      throw new IllegalArgumentException("Unsupported data type: " + type);
+  }
+}
+
+pool.insertRecord(device, time, measurements, types, values);
+```
+
+其中,IoTDB SessionPool 可按如下方式创建:
 
 ```java
-    SessionPool pool =
-        new SessionPool.Builder()
-            .host("127.0.0.1")
-            .port(6667)
-            .user("root")
-            .password("root")
-            .maxSize(3)
-            .build();
-    List<String> datas = new ArrayList<>(records.count());
-    for (ConsumerRecord<String, String> record : records) {
-      datas.add(record.value());
-    }        
-    int size = datas.size();
-    List<String> deviceIds = new ArrayList<>(size);
-    List<Long> times = new ArrayList<>(size);
-    List<List<String>> measurementsList = new ArrayList<>(size);
-    List<List<TSDataType>> typesList = new ArrayList<>(size);
-    List<List<Object>> valuesList = new ArrayList<>(size);
-    for (String data : datas) {
-      String[] dataArray = data.split(",");
-      String device = dataArray[0];
-      long time = Long.parseLong(dataArray[1]);
-      List<String> measurements = Arrays.asList(dataArray[2].split(":"));
-      List<TSDataType> types = new ArrayList<>();
-      for (String type : dataArray[3].split(":")) {
-        types.add(TSDataType.valueOf(type));
-      }
-      List<Object> values = new ArrayList<>();
-      String[] valuesStr = dataArray[4].split(":");
-      for (int i = 0; i < valuesStr.length; i++) {
-        switch (types.get(i)) {
-          case INT64:
-            values.add(Long.parseLong(valuesStr[i]));
-            break;
-          case DOUBLE:
-            values.add(Double.parseDouble(valuesStr[i]));
-            break;
-          case INT32:
-            values.add(Integer.parseInt(valuesStr[i]));
-            break;
-          case TEXT:
-            values.add(valuesStr[i]);
-            break;
-          case FLOAT:
-            values.add(Float.parseFloat(valuesStr[i]));
-            break;
-          case BOOLEAN:
-            values.add(Boolean.parseBoolean(valuesStr[i]));
-            break;
-        }
-      }
-      deviceIds.add(device);
-      times.add(time);
-      measurementsList.add(measurements);
-      typesList.add(types);
-      valuesList.add(values);
-    }
-    pool.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
- ```
+SessionPool pool = new SessionPool.Builder()
+    .host("127.0.0.1")
+    .port(6667)
+    .user("root")
+    .password("root")
+    .maxSize(3)
+    .build();
+```
+
+## 6. 查询写入结果
+
+连接 IoTDB CLI:
+
+```bash
+./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root
+```
+
+执行查询:
+
+```sql
+SELECT * FROM root.kafka.**;
+```
+
+查询结果中可以看到由 Kafka Consumer 写入的时间序列数据,例如:
+
+```text
++-----------------------------+-------------------------+--------------------+
+|                         Time|root.kafka.d0.temperature|root.kafka.d0.status|
++-----------------------------+-------------------------+--------------------+
+|2024-05-20T10:00:00.000+08:00|                     36.5|                true|
++-----------------------------+-------------------------+--------------------+
+```
 
diff --git a/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md 
b/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md
index 214cc2c7..9b81a9d2 100644
--- a/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md
+++ b/src/zh/UserGuide/latest/Ecosystem-Integration/Programming-Kafka.md
@@ -21,98 +21,171 @@
 
 # Kafka
 
-[Apache Kafka](https://kafka.apache.org/)  
是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。
+[Apache Kafka](https://kafka.apache.org/) 
是一个开源的分布式事件流平台,常用于构建高性能数据管道、流式分析和数据集成系统。IoTDB 可以通过 Kafka Consumer 订阅 Kafka 
中的数据,并使用 Session API 将数据写入 IoTDB。
 
-## 1. 示例代码
+本文介绍一个简单的数据写入流程:应用程序向 Kafka Topic 写入消息,Kafka Consumer 消费消息并解析为 IoTDB 时序数据,最后写入 
IoTDB。
 
-### 1.1 kafka 生产者生产数据 Java 代码示例
+## 1. 环境准备
+
+使用前请确保已准备以下环境:
+
+- JDK 8 或以上版本
+- Maven 3.6 或以上版本
+- Apache Kafka,安装与启动方式请参考 [Kafka 官方文档](https://kafka.apache.org/documentation/)
+- IoTDB 服务已启动
+
+以下示例中使用的默认地址如下:
+
+| 服务 | 地址 |
+| --- | --- |
+| Kafka | `127.0.0.1:9092` |
+| IoTDB | `127.0.0.1:6667` |
+| IoTDB 用户名 | `root` |
+| IoTDB 密码 | `root` |
+
+## 2. 添加项目依赖
+
+在 Maven 项目的 `pom.xml` 中添加 Kafka 与 IoTDB Session 相关依赖。IoTDB 依赖版本建议与实际部署的 IoTDB 
版本保持一致。
+
+```xml
+<dependencies>
+    <dependency>
+        <groupId>org.apache.iotdb</groupId>
+        <artifactId>iotdb-session</artifactId>
+        <version>2.0.4</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>3.7.0</version>
+    </dependency>
+</dependencies>
+```
+
+完整示例工程可参考:[iotdb-extras/examples/kafka](https://github.com/apache/iotdb-extras/tree/master/examples/kafka)。
+
+## 3. Kafka 消息格式
+
+示例程序使用字符串格式传输一条 IoTDB 数据记录:
+
+```text
+device,timestamp,measurements,types,values
+```
+
+字段说明如下:
+
+| 字段 | 说明 | 示例 |
+| --- | --- | --- |
+| `device` | IoTDB 设备路径 | `root.kafka.d0` |
+| `timestamp` | 时间戳,单位为毫秒 | `1716180000000` |
+| `measurements` | 测点名称,多个测点使用 `:` 分隔 | `temperature:status` |
+| `types` | 数据类型,多个类型使用 `:` 分隔 | `DOUBLE:BOOLEAN` |
+| `values` | 数据值,多个值使用 `:` 分隔 | `36.5:true` |
+
+单测点消息示例:
+
+```text
+root.kafka.d0,1716180000000,temperature,DOUBLE,36.5
+```
+
+多测点消息示例:
+
+```text
+root.kafka.d0,1716180000000,temperature:status,DOUBLE:BOOLEAN,36.5:true
+```
+
+## 4. 生产数据到 Kafka
+
+以下代码展示向 `Kafka-Test` Topic 写入一条 IoTDB 数据记录的关键逻辑:
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.serializer", StringSerializer.class);
-    props.put("value.serializer", StringSerializer.class);
-    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
-    producer.send(
-        new ProducerRecord<>(
-            "Kafka-Test", "key", "root.kafka," + System.currentTimeMillis() + 
",value,INT32,100"));
-    producer.close();
+String value = "root.kafka.d0,"
+    + System.currentTimeMillis()
+    + ",temperature:status,DOUBLE:BOOLEAN,36.5:true";
+
+producer.send(new ProducerRecord<>("Kafka-Test", "iotdb", value));
 ```
 
-### 1.2 kafka 消费者接收数据 Java 代码示例
+## 5. 消费 Kafka 数据并写入 IoTDB
+
+Kafka Consumer 从 Topic 中读取消息后,解析设备、时间戳、测点、类型和值,并调用 IoTDB SessionPool 写入数据。
 
 ```java
-    Properties props = new Properties();
-    props.put("bootstrap.servers", "127.0.0.1:9092");
-    props.put("key.deserializer", StringDeserializer.class);
-    props.put("value.deserializer", StringDeserializer.class);
-    props.put("auto.offset.reset", "earliest");
-    props.put("group.id", "Kafka-Test");
-    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
-    kafkaConsumer.subscribe(Collections.singleton("Kafka-Test"));
-    ConsumerRecords<String, String> records = 
kafkaConsumer.poll(Duration.ofSeconds(1));
- ```
-
-### 1.3 存入 IoTDB 服务器的 Java 代码示例
+String[] fields = record.value().split(",");
+String device = fields[0];
+long time = Long.parseLong(fields[1]);
+List<String> measurements = Arrays.asList(fields[2].split(":"));
+
+String[] typeNames = fields[3].split(":");
+String[] valueTexts = fields[4].split(":");
+
+List<TSDataType> types = new ArrayList<>();
+List<Object> values = new ArrayList<>();
+
+for (int i = 0; i < typeNames.length; i++) {
+  TSDataType type = TSDataType.valueOf(typeNames[i]);
+  types.add(type);
+
+  switch (type) {
+    case INT32:
+      values.add(Integer.parseInt(valueTexts[i]));
+      break;
+    case INT64:
+      values.add(Long.parseLong(valueTexts[i]));
+      break;
+    case FLOAT:
+      values.add(Float.parseFloat(valueTexts[i]));
+      break;
+    case DOUBLE:
+      values.add(Double.parseDouble(valueTexts[i]));
+      break;
+    case BOOLEAN:
+      values.add(Boolean.parseBoolean(valueTexts[i]));
+      break;
+    case TEXT:
+      values.add(valueTexts[i]);
+      break;
+    default:
+      throw new IllegalArgumentException("Unsupported data type: " + type);
+  }
+}
+
+pool.insertRecord(device, time, measurements, types, values);
+```
+
+其中,IoTDB SessionPool 可按如下方式创建:
 
 ```java
-    SessionPool pool =
-        new SessionPool.Builder()
-            .host("127.0.0.1")
-            .port(6667)
-            .user("root")
-            .password("root")
-            .maxSize(3)
-            .build();
-    List<String> datas = new ArrayList<>(records.count());
-    for (ConsumerRecord<String, String> record : records) {
-      datas.add(record.value());
-    }        
-    int size = datas.size();
-    List<String> deviceIds = new ArrayList<>(size);
-    List<Long> times = new ArrayList<>(size);
-    List<List<String>> measurementsList = new ArrayList<>(size);
-    List<List<TSDataType>> typesList = new ArrayList<>(size);
-    List<List<Object>> valuesList = new ArrayList<>(size);
-    for (String data : datas) {
-      String[] dataArray = data.split(",");
-      String device = dataArray[0];
-      long time = Long.parseLong(dataArray[1]);
-      List<String> measurements = Arrays.asList(dataArray[2].split(":"));
-      List<TSDataType> types = new ArrayList<>();
-      for (String type : dataArray[3].split(":")) {
-        types.add(TSDataType.valueOf(type));
-      }
-      List<Object> values = new ArrayList<>();
-      String[] valuesStr = dataArray[4].split(":");
-      for (int i = 0; i < valuesStr.length; i++) {
-        switch (types.get(i)) {
-          case INT64:
-            values.add(Long.parseLong(valuesStr[i]));
-            break;
-          case DOUBLE:
-            values.add(Double.parseDouble(valuesStr[i]));
-            break;
-          case INT32:
-            values.add(Integer.parseInt(valuesStr[i]));
-            break;
-          case TEXT:
-            values.add(valuesStr[i]);
-            break;
-          case FLOAT:
-            values.add(Float.parseFloat(valuesStr[i]));
-            break;
-          case BOOLEAN:
-            values.add(Boolean.parseBoolean(valuesStr[i]));
-            break;
-        }
-      }
-      deviceIds.add(device);
-      times.add(time);
-      measurementsList.add(measurements);
-      typesList.add(types);
-      valuesList.add(values);
-    }
-    pool.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
- ```
+SessionPool pool = new SessionPool.Builder()
+    .host("127.0.0.1")
+    .port(6667)
+    .user("root")
+    .password("root")
+    .maxSize(3)
+    .build();
+```
+
+## 6. 查询写入结果
+
+连接 IoTDB CLI:
+
+```bash
+./sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root
+```
+
+执行查询:
+
+```sql
+SELECT * FROM root.kafka.**;
+```
+
+查询结果中可以看到由 Kafka Consumer 写入的时间序列数据,例如:
+
+```text
++-----------------------------+-------------------------+--------------------+
+|                         Time|root.kafka.d0.temperature|root.kafka.d0.status|
++-----------------------------+-------------------------+--------------------+
+|2024-05-20T10:00:00.000+08:00|                     36.5|                true|
++-----------------------------+-------------------------+--------------------+
+```
 


Reply via email to