This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ea57453fb [INLONG-651][Doc] Add latest dataproxy doc on MQ sink model 
(#652)
3ea57453fb is described below

commit 3ea57453fb5f0d39438c0762054c93b66981aacb
Author: woofyzhao <[email protected]>
AuthorDate: Mon Dec 19 16:51:57 2022 +0800

    [INLONG-651][Doc] Add latest dataproxy doc on MQ sink model (#652)
    
    Co-authored-by: Charles Zhang <[email protected]>
---
 .../how_to_write_plugin_dataproxy.md               |  84 ++++++++++++++++++++
 docs/design_and_concept/img/dataproxy_mq_sink.png  | Bin 0 -> 60721 bytes
 docs/modules/dataproxy/configuration.md            |  37 ++++-----
 .../how_to_write_plugin_dataproxy.md               |  85 +++++++++++++++++++++
 .../design_and_concept/img/dataproxy_mq_sink.png   | Bin 0 -> 60721 bytes
 .../current/modules/dataproxy/configuration.md     |  37 ++++-----
 .../current/modules/dataproxy/quick_start.md       |   6 +-
 7 files changed, 214 insertions(+), 35 deletions(-)

diff --git a/docs/design_and_concept/how_to_write_plugin_dataproxy.md 
b/docs/design_and_concept/how_to_write_plugin_dataproxy.md
new file mode 100644
index 0000000000..164b35bb51
--- /dev/null
+++ b/docs/design_and_concept/how_to_write_plugin_dataproxy.md
@@ -0,0 +1,84 @@
+---
+title: DataProxy Plugin
+sidebar_position: 6
+---
+
+## Overview
+
+DataProxy implements an abstract unified MQ (Message Queue) sink model, so 
that developers and easily extend mq sink types under standard 
MessageQueueZoneSink. By default, Apache Pulsar, Apache Kafka and InLong TubeMQ 
are already integrated. This article guides developers to extend new MQ types 
accordingly.
+
+## Concepts and Models
+
+DataProxy is a message flow architecture based on Apache Flume with its 
`Source` + `Channel` + `Sink` components. Here we focus on the sink layer alone.
+
+- `MessageQueueZoneSink`: The standard MQ sink provided by DataProxy, 
supposedly to support all kinds of MQ types.
+- `MessageQueueHandler`: The abstract MQ handler interface that deals with 
connecting, sending data to, and disconnecting the MQ cluster.
+- `EventHandler`: The interface to convert MQ message header and body when 
required. For example to convert the data protocol. 
+
+When a new MQ cluster type needs to be integrated, developers should at least 
implement the MessageQueueHandler interface as plugin. Optionally they can also 
implement the EventHandler interface to transform data as required. The plugin 
classes can be specified and pulled from manager as configuration information 
so that new MQ type can be easily extended on the fly.
+
+## Demonstration
+
+The concepts introduced above can be represented by the following figure:
+![](img/dataproxy_mq_sink.png)
+
+## Development Process
+
+In the rest of the article we use the Kafka MQ with ProtoBuffer message format 
as an example. Here's what to do:
+- Implement the subclass plugin of MessageQueueHandler, namely KafKaHandler 
and its init / start /stop / send methods.
+- Implenent the EventHandler interface as ProtoBufferEventHandler and its 
parseHeader / parseBody method 
+
+## Plugin Implementation
+
+### MessageQueueHandler
+```java
+private class KafkaHandler implements MessageQueueHandler {
+
+    private EventHandler handler;
+    
+    @Override
+    public void init(CacheClusterConfig config, MessageQueueZoneSinkContext 
sinkContext) {
+        // initialize configuration and event handler
+    }
+    
+    @Override
+    public void start() {
+        // create Kafka Producer
+    }
+
+    @Override
+    public void stop() {
+       // close Kafka Producer
+    }
+
+  @Override
+    public boolean send(BatchPackProfile event) {
+        // process and send data
+    }
+}
+```
+
+### EventHandler
+```java
+public class ProtoBufferEventHandler implements EventHandler {
+
+    @Override
+    public Map<String, String> parseHeader(IdTopicConfig idConfig, 
BatchPackProfile profile, String nodeId,
+            INLONG_COMPRESSED_TYPE compressType) {
+        // retrieve, process and convert event header
+    }
+
+    @Override
+    public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, 
INLONG_COMPRESSED_TYPE compressType)
+            throws IOException {
+        // retrieve and repack event to ProtoBuffer message
+    }
+}
+```
+(See the full implementation 
org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler from inlong-dataproxy 
module)
+
+## Plugin Configuration
+
+### dataproxy.conf
+
+The sink configuration please refer to [Sink Configuration 
Exampnle](modules/dataproxy/configuration.md)
\ No newline at end of file
diff --git a/docs/design_and_concept/img/dataproxy_mq_sink.png 
b/docs/design_and_concept/img/dataproxy_mq_sink.png
new file mode 100644
index 0000000000..7d44fd79d0
Binary files /dev/null and b/docs/design_and_concept/img/dataproxy_mq_sink.png 
differ
diff --git a/docs/modules/dataproxy/configuration.md 
b/docs/modules/dataproxy/configuration.md
index e345ce4806..9610344ce9 100644
--- a/docs/modules/dataproxy/configuration.md
+++ b/docs/modules/dataproxy/configuration.md
@@ -157,30 +157,33 @@ The time interval between data flush from memory to disk, 
in seconds
 - Sink configuration example:
 
 ```shell
-agent1.sinks.meta-sink-more1.channel = ch-msg1
+agent1.sinks.mq-sink-msg1.channel = ch-msg1
 The upstream channel name of the sink
 
-agent1.sinks.meta-sink-more1.type = org.apache.flume.sink.MetaSink
-The sink class is implemented, where the message is implemented to push data 
to the tube cluster
+agent1.sinks.mq-sink-msg1.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+The sink class is implemented, where the message is implemented to push data 
to some mq cluster
 
-agent1.sinks.meta-sink-more1.master-host-port-list =
-Tube cluster master node list
+agent1.sinks.mq-sink-msg1.maxThreads = 2
+The maximum threads for sending message
 
-agent1.sinks.meta-sink-more1.send_timeout = 30000
-Timeout limit when sending to tube
+agent1.sinks.mq-sink-msg1.dispatchTimeout = 2000
+Timeout when dispatching message
 
-agent1.sinks.meta-sink-more1.stat-interval-sec = 60
-Sink indicator statistics interval time, in seconds
+agent1.sinks.mq-sink-msg1.dispatchMaxPackCount = 256
+Dispatch queue max pack count
 
-agent1.sinks.meta-sink-more1.thread-num = 8
-Sink class sends messages to the worker thread, 8 means to start 8 concurrent 
threads
+agent1.sinks.mq-sink-msg1.dispatchMaxPackSize = 3276800
+Dispatch queue max pack size
 
-agent1.sinks.meta-sink-more1.client-id-cache = true
-agent id cache, used to check the data reported by the agent to remove 
duplicates
+agent1.sinks.mq-sink-msg1.maxBufferQueueSize=131072
+Dispatch max buffer queue size 
 
-agent1.sinks.meta-sink-more1.max-survived-time = 300000
-Maximum cache time
+agent1.sinks.mq-sink-msg1.processInterval=100
+Interval to retry
 
-agent1.sinks.meta-sink-more1.max-survived-size = 3000000
-Maximum number of caches
+agent1.sinks.mq-sink-msg1.reloadInterval=60000
+Interval to reload remote configuration
+
+agent1.sinks.mq-sink-msg1.producer.compressionType=SNAPPY
+Data compression type
 ```
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_dataproxy.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_dataproxy.md
new file mode 100644
index 0000000000..f9eaa66cf2
--- /dev/null
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_dataproxy.md
@@ -0,0 +1,85 @@
+---
+title: DataProxy 插件
+sidebar_position: 6
+---
+
+## 总览
+
+DataProxy 实现了统一抽象 MQ (Message Queue) Sink 模型,支持在标准 MessageQueueZoneSink 
下方便灵活添加不同类型的 MQ 流向,默认支持 Apache Pulsar、Apache Kafka 和 InLong 
TubeMQ。本文将指导开发者如何扩展支持新的 MQ 类型。
+
+## 概念和模型
+
+DataProxy 是基于 Apache Flume 的数据接收和流转框架,采用 `Source` + `Channel` + `Sink` 
架构模型,本文重点关注 Sink 部分。
+
+- `MessageQueueZoneSink`: DataProxy 的标准 MQ Sink,所有类型消息队列统一在此 Sink 下扩展和支持
+- `MessageQueueHandler`: 具体某类 MQ 的连接、数据发送、关闭的处理接口,默认使用 PulsarHandler 实现
+- `EventHandler`: 消息打包器接口,可用于发送下游 MQ 之前转换 Header/Body 协议, 默认透传
+
+当扩展新的 MQ 类型时,需要开发至少实现 MessageQueueHandler 作为插件,如果需要转换数据协议,可以同时扩展 EventHandler 
做相应的处理。MessageQueueHandler 和 EventHandler 作为元数据配置由 Manager 下发,达到灵活部署扩展的效果。 
+
+
+## 数据流图示
+
+上述相关模块和接口在数据流转过程中的关系可以用下图表示:
+![](img/dataproxy_mq_sink.png)
+
+## 开发流程
+
+以扩展 Kafka 类型 MQ 和 发送 ProtoBuffer 消息为例
+- 首先开发 MessageQueueHandler 的子类插件 KafKaHandler, 实现 init / star t /stop / send 
接口逻辑
+- 按需要实现 EventHandler 接口 parseHeader / parseBody 插件逻辑, 如 ProtoBufferEventHandler
+
+## 接口
+
+### MessageQueueHandler
+```java
+private class KafkaHandler implements MessageQueueHandler {
+
+    private EventHandler handler;
+    
+    @Override
+    public void init(CacheClusterConfig config, MessageQueueZoneSinkContext 
sinkContext) {
+        // 初始化配置和EventHandler
+    }
+    
+    @Override
+    public void start() {
+        // 创建 Kafka Producer
+    }
+
+    @Override
+    public void stop() {
+       // 关闭 Kafka Producer
+    }
+
+  @Override
+    public boolean send(BatchPackProfile event) {
+        // 处理并发送
+    }
+}
+```
+
+### EventHandler
+```java
+public class ProtoBufferEventHandler implements EventHandler {
+
+    @Override
+    public Map<String, String> parseHeader(IdTopicConfig idConfig, 
BatchPackProfile profile, String nodeId,
+            INLONG_COMPRESSED_TYPE compressType) {
+        // 处理、转换消息头部
+    }
+
+    @Override
+    public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, 
INLONG_COMPRESSED_TYPE compressType)
+            throws IOException {
+        // 处理、转换消息体为pb
+    }
+}
+```
+(完整示例参考 Inlong 代码库 org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler 实现)
+
+## 插件配置
+
+### dataproxy.conf
+
+Sink 部分配置参考 [Sink 配置示例](modules/dataproxy/configuration.md)
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/dataproxy_mq_sink.png
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/dataproxy_mq_sink.png
new file mode 100644
index 0000000000..7d44fd79d0
Binary files /dev/null and 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/dataproxy_mq_sink.png
 differ
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md
index 865dfdd38a..8178828e45 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/configuration.md
@@ -157,31 +157,34 @@ agent1.channels.ch-msg5.fsyncInterval = 5
 - Sink 配置示例:
 
 ```shell
-agent1.sinks.meta-sink-more1.channel = ch-msg1
+agent1.sinks.mq-sink-msg1.channel = ch-msg1
 sink的上游channel名称
 
-agent1.sinks.meta-sink-more1.type = org.apache.flume.sink.MetaSink
-sink类实现,此处实现消息向tube集群推送数据
+agent1.sinks.mq-sink-msg1.type = 
org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSink
+sink类实现,此处为统一MQ模型Sink
 
-agent1.sinks.meta-sink-more1.master-host-port-list = 
-tube集群master节点列表
+agent1.sinks.mq-sink-msg1.maxThreads = 2
+最大发送线程数
 
-agent1.sinks.meta-sink-more1.send_timeout = 30000
-发送到tube时超时时间限制
+agent1.sinks.mq-sink-msg1.dispatchTimeout = 2000
+Dispatch队列超时时间
 
-agent1.sinks.meta-sink-more1.stat-interval-sec = 60
-sink指标统计间隔时间,单位秒
+agent1.sinks.mq-sink-msg1.dispatchMaxPackCount = 256
+Dispatch队列最大包数量
 
-agent1.sinks.meta-sink-more1.thread-num = 8
-Sink类发送消息的工作线程,8表示启动8个并发线程
+agent1.sinks.mq-sink-msg1.dispatchMaxPackSize = 3276800
+Dispatch队列最大包Size
 
-agent1.sinks.meta-sink-more1.client-id-cache = true
-agent id缓存,用于检查agent上报数据去重
+agent1.sinks.mq-sink-msg1.maxBufferQueueSize=131072
+Dispatch队列最大长度
 
-agent1.sinks.meta-sink-more1.max-survived-time = 300000
-缓存最大时间
+agent1.sinks.mq-sink-msg1.processInterval=100
+重试发送间隔
 
-agent1.sinks.meta-sink-more1.max-survived-size = 3000000
-缓存最大个数
+agent1.sinks.mq-sink-msg1.reloadInterval=60000
+配置重新加载时间间隔
+
+agent1.sinks.mq-sink-msg1.producer.compressionType=SNAPPY
+数据压缩算法
 ```
     
\ No newline at end of file
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/quick_start.md
 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/quick_start.md
index a0731a10f1..5e416ea1f4 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/quick_start.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/modules/dataproxy/quick_start.md
@@ -27,8 +27,12 @@ audit.proxys=127.0.0.1:10081
 bash +x bin/dataproxy-start.sh
 # 或者
 bash +x bin/dataproxy-start.sh pulsar
+
 # 如果使用 Inlong TubeMQ 来缓存数据
-# bash +x bin/dataproxy-start.sh tube
+# bash +x bin/dataproxy-start.sh tubemq
+
+# 如果使用 Kafka 来缓存数据
+# bash +x bin/dataproxy-start.sh kafka
 ```
 
 ## 检查

Reply via email to