This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry-site.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e7b055c docs: add kafka fdw docs (#231)
0e7b055c is described below

commit 0e7b055c045e685c7a3a4ce2a451a30f7146964e
Author: TomShawn <[email protected]>
AuthorDate: Mon Feb 10 11:15:55 2025 +0800

    docs: add kafka fdw docs (#231)
---
 docs/data-loading/index.md                         |   1 +
 .../data-loading/load-data-from-kafka-using-fdw.md | 166 +++++++++++++++++++++
 .../data-loading/load-data-from-kafka-using-fdw.md | 165 ++++++++++++++++++++
 .../current/data-loading/load-data-overview.md     |   3 +-
 sidebars.ts                                        |   2 +-
 5 files changed, 335 insertions(+), 2 deletions(-)

diff --git a/docs/data-loading/index.md b/docs/data-loading/index.md
index 8fb72187..2715239e 100644
--- a/docs/data-loading/index.md
+++ b/docs/data-loading/index.md
@@ -26,6 +26,7 @@ Cloudberry Database offers multiple data loading solutions, 
and you can select d
 | [`gpfdist`](/docs/data-loading/load-data-using-gpfdist.md)    | Local host 
files or files accessible via internal network   | • TXT<br />• CSV<br />• Any 
delimited text format supported by the `FORMAT` clause<br />• XML and JSON 
(requires conversion to text format via YAML configuration file) | Yes      |   
            |
 | [Batch loading using `gpload`](/docs/data-loading/load-data-using-gpload.md) 
(with `gpfdist` as the underlying worker) | Local host files or files 
accessible via internal network   | • TXT<br />• CSV<br />• Any delimited text 
format supported by the `FORMAT` clause<br />• XML and JSON (require conversion 
to text format via YAML configuration file) | Yes      |
 | [Creating external web 
tables](/docs/data-loading/load-data-from-web-services.md)         | Data 
pulled from network services or from any source accessible by command lines | • 
TXT<br />• CSV                                             | Yes      |
+| [Kafka FDW](/docs/data-loading/load-data-from-kafka-using-fdw.md)         | 
Streaming data from Apache Kafka | • JSON<br />• CSV                            
                 | No      |
 
 ## Learn more
 
diff --git a/docs/data-loading/load-data-from-kafka-using-fdw.md 
b/docs/data-loading/load-data-from-kafka-using-fdw.md
new file mode 100644
index 00000000..74547793
--- /dev/null
+++ b/docs/data-loading/load-data-from-kafka-using-fdw.md
@@ -0,0 +1,166 @@
+---
+title: Load Data from Kafka Using Kafka FDW
+---
+
+# Load Data from Kafka Using Kafka FDW
+
+Kafka Foreign Data Wrapper (FDW) allows Apache Cloudberry to connect directly 
to Apache Kafka, enabling it to read and process Kafka data as external tables. 
This integration improves the efficiency, flexibility, and reliability of 
real-time Kafka data processing, enhancing data operations and business 
performance.
+
+Apache Cloudberry supports using Kafka FDW to create external tables and 
import data.
+
+See the Kafka FDW repository for more information: 
[https://github.com/cloudberry-contrib/kafka_fdw](https://github.com/cloudberry-contrib/kafka_fdw).
 Note that this repo is contributed by the community members, but it is not 
maintained by the PPMC as an official community project.
+
+## Basic usage
+
+- Create the `kafka_fdw` extension:
+
+    ``` sql
+    postgres=# CREATE EXTENSION kafka_fdw;
+    CREATE EXTENSION
+    ```
+
+- Create an external server and specify Kafka's cluster address. You need to 
replace `localhost:9092` with your Kafka cluster address.
+
+    ``` sql
+    CREATE SERVER kafka_server
+    FOREIGN DATA WRAPPER kafka_fdw
+    OPTIONS (mpp_execute 'all segments', brokers 'localhost:9092');
+    ```
+
+- Create user mapping:
+
+    ``` sql
+    CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
+    ```
+
+- Create an external table:
+
+    When creating an external table, you need to specify two metadata columns: 
`partition` and `offset`, which identify the partition and offset of messages 
in a Kafka topic. Here is an example:
+
+    ``` sql
+    CREATE FOREIGN TABLE kafka_test (
+        part int OPTIONS (partition 'true'),
+        offs bigint OPTIONS (offset 'true'),
+        some_int int,
+        some_text text,
+        some_date date,
+        some_time timestamp
+    )
+    SERVER kafka_server OPTIONS
+        (format 'csv', topic 'contrib_regress_csv', batch_size '1000', 
buffer_delay '1000');
+    ```
+
+    Parameter description:
+
+    - `batch_size`: The size of data read from Kafka at once.
+    - `buffer_delay`: The timeout for getting data from Kafka.
+
+## Supported data formats
+
+Currently, `CSV` and `JSON` data formats are supported.
+
+## Query
+
+You can specify the message partition and offset in your query by using the 
`partition` or `offset` column condition:
+
+``` sql
+SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;
+```
+
+You can also specify multiple conditions:
+
+``` sql
+SELECT * FROM kafka_test WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs 
> 300) OR (part = 3 AND offs > 700);
+```
+
+## Message producer
+
+Currently, Kafka FDW supports inserting data into external tables, which acts 
as a message producer for Kafka. You only need to use the `INSERT` statement.
+
+``` sql
+INSERT INTO kafka_test(part, some_int, some_text)
+VALUES
+    (0, 5464565, 'some text goes into partition 0'),
+    (1, 5464565, 'some text goes into partition 1'),
+    (0, 5464565, 'some text goes into partition 0'),
+    (3, 5464565, 'some text goes into partition 3'),
+    (NULL, 5464565, 'some text goes into partition selected by kafka');
+```
+
+When inserting data, you can specify `partition` to specify which partition to 
insert into.
+
+## Data import
+
+To use Kafka FDW for data import, you can create custom functions, such as the 
`INSERT INTO SELECT` statement. The basic principle is to fetch all data from 
the external table and insert it into the target table sequentially.
+
+Here is a simple example, which you can modify according to your needs:
+
+``` sql
+CREATE OR REPLACE FUNCTION import_kafka_data(
+  src_table_name text,
+  dest_table_name text,
+  dest_table_columns text[]
+) RETURNS void AS $$
+
+DECLARE
+    current_row RECORD;
+    current_part integer;
+    current_offs bigint;
+    max_off bigint;
+    import_progress_table_name text;
+    max_off_result bigint;
+BEGIN
+
+    import_progress_table_name := src_table_name || '_import_progress';
+
+    -- Creates progress record table.
+    EXECUTE FORMAT('CREATE TABLE IF NOT EXISTS %I (part integer PRIMARY KEY, 
offs bigint NOT NULL)', import_progress_table_name);
+
+    -- The number of partitions in the topic table might change, so 
reinitialize before each import.
+    EXECUTE FORMAT('INSERT INTO %I SELECT DISTINCT part, 0 FROM %I ON CONFLICT 
(part) DO NOTHING', import_progress_table_name, src_table_name);
+
+    -- Imports data partition by partition.
+    FOR current_row IN
+        EXECUTE FORMAT('SELECT part, offs FROM %I', import_progress_table_name)
+    LOOP
+        current_part := current_row.part;
+        current_offs := current_row.offs;
+
+        -- Gets the maximum offset for the current partition.
+        EXECUTE FORMAT('SELECT MAX(offs) FROM %I WHERE part = %s', 
src_table_name, current_part) INTO max_off_result;
+        max_off := max_off_result;
+
+        -- Skips if there is no new data.
+        IF max_off+1 = current_offs THEN
+            CONTINUE;
+        END IF;
+
+        -- Imports data.
+        EXECUTE FORMAT('
+            INSERT INTO %I (%s)
+            SELECT %s
+            FROM %I
+            WHERE part = %s AND offs >= %s AND offs <= %s',
+            dest_table_name,
+            array_to_string(dest_table_columns, ', '),
+            array_to_string(dest_table_columns, ', '),
+            src_table_name,
+            current_part,
+            current_offs,
+            max_off
+        );        
+
+        -- Updates import progress.
+        EXECUTE FORMAT('UPDATE %I SET offs = %s WHERE part = %s', 
import_progress_table_name, max_off + 1, current_part);
+    END LOOP;
+
+    RETURN;
+END;
+$$ LANGUAGE plpgsql;
+```
+
+When executing the query, call this function, passing in the external table 
name, target table name, and the fields to be imported. Here is an example:
+
+``` sql
+SELECT import_kafka_data('kafka_test', 'dest_table_fdw', ARRAY['some_int', 
'some_text', 'some_date', 'some_time']);
+```
diff --git 
a/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-from-kafka-using-fdw.md
 
b/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-from-kafka-using-fdw.md
new file mode 100644
index 00000000..d600e74d
--- /dev/null
+++ 
b/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-from-kafka-using-fdw.md
@@ -0,0 +1,165 @@
+---
+title: 从 Kafka 加载数据
+---
+
+# 从 Kafka 加载数据
+
+Kafka Foreign Data Wrapper (FDW) 提供了 Apache Cloudberry 与 Apache Kafka 
连接的能力,使得数据库能够直接从 Kafka 中读取数据,并将其作为外部表来处理。Apache Cloudberry 用户可以更高效、灵活、可靠地处理 
Kafka 中的实时数据,从而提高数据处理能力和业务效率。
+
+Apache Cloudberry 支持使用 Kafka FDW 来创建外部表以及导入数据。
+
+更多信息,参考 Kafka FDW 
仓库:[https://github.com/cloudberry-contrib/kafka_fdw](https://github.com/cloudberry-contrib/kafka_fdw)。注意,该仓库由社区成员贡献,但不是由
 PPMC 维护的官方社区项目。
+
+## 基本使用
+
+- 创建插件。
+
+    ``` sql
+    CREATE EXTENSION kafka_fdw;
+    ```
+
+- 创建外部服务器,指定 Kafka 的集群地址。你需要将 `localhost:9092` 替换为你的 Kafka 集群地址。
+
+    ``` sql
+    CREATE SERVER kafka_server
+    FOREIGN DATA WRAPPER kafka_fdw
+    OPTIONS (mpp_execute 'all segments', brokers 'localhost:9092');
+    ```
+
+- 创建 user mapping。
+
+    ``` sql
+    CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
+    ```
+
+- 创建外部表
+
+    创建外部表时,必须指定两个元数据信息列 `partition` 和 `offset`,用于标识 Kafka 中的一个 Topic 
的消息所属的分区和偏移。下面是一个示例:
+
+    ``` sql
+    CREATE FOREIGN TABLE kafka_test (
+        part int OPTIONS (partition 'true'),
+        offs bigint OPTIONS (offset 'true'),
+        some_int int,
+        some_text text,
+        some_date date,
+        some_time timestamp
+    )
+    SERVER kafka_server OPTIONS
+        (format 'csv', topic 'contrib_regress_csv', batch_size '1000', 
buffer_delay '1000');
+    ```
+
+    参数说明:
+
+    - `batch_size`:从 Kafka 读取一次数据的量。
+    - `buffer_delay`:从 Kafka 获取数据的超时时间。
+
+## 支持的数据格式
+
+目前支持 `CSV` 和 `JSON` 两种数据格式。
+
+## 查询
+
+可以在查询的时候指定消息的分区和偏移,指定 `partition` 或 `offset`:
+
+``` sql
+SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;
+```
+
+也可以指定多个条件:
+
+``` sql
+SELECT * FROM kafka_test WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs 
> 300) OR (part = 3 AND offs > 700);
+```
+
+## 消息生产者
+
+目前 Kafka FDW 支持向外表中插入数据,即作为了 Kafka 的消息生产者。只需要使用 INSERT 语句即可。
+
+``` sql
+INSERT INTO kafka_test(part, some_int, some_text)
+VALUES
+    (0, 5464565, 'some text goes into partition 0'),
+    (1, 5464565, 'some text goes into partition 1'),
+    (0, 5464565, 'some text goes into partition 0'),
+    (3, 5464565, 'some text goes into partition 3'),
+    (NULL, 5464565, 'some text goes into partition selected by kafka');
+```
+
+插入的时候可以指定 `partition` 表示插入到哪个分区。
+
+## 数据导入功能
+
+如果想要通过 kafka FDW 实现类似数据导入的功能,您可以通过自定义函数来实现,例如 `insert into select` 
语句,基本原理是将外表中的所有数据依次取出来插入到目标表中。
+
+下面是一个简单的示例,您可以根据实际情况对此函数进行修改:
+
+``` sql
+CREATE OR REPLACE FUNCTION import_kafka_data(
+  src_table_name text,
+  dest_table_name text,
+  dest_table_columns text[]
+) RETURNS void AS $$
+
+DECLARE
+    current_row RECORD;
+    current_part integer;
+    current_offs bigint;
+    max_off bigint;
+    import_progress_table_name text;
+    max_off_result bigint;
+BEGIN
+
+    import_progress_table_name := src_table_name || '_import_progress';
+
+    -- 创建进度记录表
+    EXECUTE FORMAT('CREATE TABLE IF NOT EXISTS %I (part integer PRIMARY KEY, 
offs bigint NOT NULL)', import_progress_table_name);
+
+    -- 表的 topic 的 partition 数量有可能发生变化,所以每次导入前都要重新初始化
+    EXECUTE FORMAT('INSERT INTO %I SELECT DISTINCT part, 0 FROM %I ON CONFLICT 
(part) DO NOTHING', import_progress_table_name, src_table_name);
+
+    -- 逐个分区导入数据
+    FOR current_row IN
+        EXECUTE FORMAT('SELECT part, offs FROM %I', import_progress_table_name)
+    LOOP
+        current_part := current_row.part;
+        current_offs := current_row.offs;
+
+        -- 获取当前分区的最大 offset
+        EXECUTE FORMAT('SELECT MAX(offs) FROM %I WHERE part = %s', 
src_table_name, current_part) INTO max_off_result;
+        max_off := max_off_result;
+
+        -- 没有新数据跳过
+        IF max_off+1 = current_offs THEN
+            CONTINUE;
+        END IF;
+
+        -- 导入数据
+        EXECUTE FORMAT('
+            INSERT INTO %I (%s)
+            SELECT %s
+            FROM %I
+            WHERE part = %s AND offs >= %s AND offs <= %s',
+            dest_table_name,
+            array_to_string(dest_table_columns, ', '),
+            array_to_string(dest_table_columns, ', '),
+            src_table_name,
+            current_part,
+            current_offs,
+            max_off
+        );        
+
+        -- 更新导入进度
+        EXECUTE FORMAT('UPDATE %I SET offs = %s WHERE part = %s', 
import_progress_table_name, max_off + 1, current_part);
+    END LOOP;
+
+    RETURN;
+END;
+$$ LANGUAGE plpgsql;
+```
+
+执行的时候只需要调用这个函数,传入外表名称、目标表名称、需要导入的字段即可,如下:
+
+``` sql
+SELECT import_kafka_data('kafka_test', 'dest_table_fdw', ARRAY['some_int', 
'some_text', 'some_date', 'some_time']);
+```
diff --git 
a/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-overview.md
 
b/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-overview.md
index fba6330d..0d614272 100644
--- 
a/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-overview.md
+++ 
b/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-overview.md
@@ -25,4 +25,5 @@ Cloudberry Database 提供了多种数据加载解决方案,你可以根据不
 | [`file://` 
协议](/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-using-file-protocol.md)
         | 本地文件系统(本地 Segment 主机,仅超级用户可访问) | • TXT<br />• CSV                    
                         | 是      |
 | 
[`gpfdist`](/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-using-gpfdist.md)
    |  本地主机文件或者通过内网可访问的文件  | • TXT<br />• CSV<br />• `FORMAT` 子句支持的任意分隔文本格式<br 
/>• XML 和 JSON(需要通过 YAML 配置文件转换为文本格式) | 是     |               |
 | [使用 `gpload` 
批量加载](/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-using-gpload.md)(使用
 `gpfdists` 为底层工作组件) |  本地主机文件或者可通过内网访问的文件  | • TXT<br />• CSV<br />• `FORMAT` 
子句支持的任意分隔文本格式<br />• XML 和 JSON(需要通过 YAML 配置文件转换为文本格式) | 是      |
-| [创建外部 Web 
表](/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-from-web-services.md)
         | 从网络服务或可通过命令行访问的任意来源提取的数据 | • TXT<br />• CSV                          
                   | 是      |
\ No newline at end of file
+| [创建外部 Web 
表](/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-from-web-services.md)
         | 从网络服务或可通过命令行访问的任意来源提取的数据 | • TXT<br />• CSV                          
                   | 是      |
+| [Kafka 
FDW](/i18n/zh/docusaurus-plugin-content-docs/current/data-loading/load-data-from-kafka-using-fdw.md)
         | 从 Apache Kafka 流式加载数据 | • JSON<br />• CSV                            
                 | 否      |
\ No newline at end of file
diff --git a/sidebars.ts b/sidebars.ts
index 767f7f83..af96165a 100644
--- a/sidebars.ts
+++ b/sidebars.ts
@@ -49,7 +49,7 @@ const sidebars: SidebarsConfig = {
         label: 'Load Data from Local Files',
         items: ['data-loading/load-data-using-copy', 
'data-loading/load-data-using-gpfdist', 
'data-loading/load-data-using-file-protocol','data-loading/load-data-using-gpload']
       },
-      'data-loading/load-data-from-web-services']
+      'data-loading/load-data-from-web-services', 
'data-loading/load-data-from-kafka-using-fdw']
     },
 
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to