Erik1288 opened a new issue, #5516:
URL: https://github.com/apache/rocketmq/issues/5516

   The issue tracker is used for bug reporting purposes **ONLY** whereas 
feature request needs to follow the [RIP 
process](https://github.com/apache/rocketmq/wiki/RocketMQ-Improvement-Proposal).
 To avoid unnecessary duplication, please check whether there is a previous 
issue before filing a new one.
   
   It is recommended to start a discussion thread in the [mailing 
lists](http://rocketmq.apache.org/about/contact/) in cases of discussing your 
deployment plan, API clarification, and other non-bug-reporting issues.
   We welcome any friendly suggestions, bug fixes, collaboration, and other 
improvements.
   
   Please ensure that your bug report is clear and self-contained. Otherwise, 
it would take additional rounds of communication, thus more time, to understand 
the problem itself.
   
   Generally, fixing an issue goes through the following steps:
   1. Understand the issue reported;
   1. Reproduce the unexpected behavior locally;
   1. Perform root cause analysis to identify the underlying problem;
   1. Create test cases to cover the identified problem;
   1. Work out a solution to rectify the behavior and make the newly created 
test cases pass;
   1. Make a pull request and go through peer review;
   
   As a result, it would be very helpful yet challenging if you could provide 
an isolated project reproducing your reported issue. Anyway, please ensure your 
issue report is informative enough for the community to pick up. At a minimum, 
include the following hints:
   
   **FEATURE REQUEST**
   
   1. Please describe the feature you are requesting.
   In the current design of the RocketMQ API, the CRUD operations are for one 
single entity. For example, SEND_MESSAGE RPC can only send one (Topic, Queue) 
message at a time. Similarly, requests for message consumption, offsets commit 
have the same issue.
   In order to meet the requirements of high throughput in data-streaming, we 
need to design a Batch-Protocol, so that the core RPCs in streaming can deal 
with multiple entities at one time.
   
   Object:
   Support sending a bound of sub-requests IN ONE RPC, especially 
data-streaming-related RPCs like sending, consuming and managing consumer 
offsets.
   
   3. Provide any additional detail on your proposed use case for this feature.
   Customize the remote command body as child requests or a child responses.
   
   BatchRequest:
     Batch Packet Size
     Batch Header Length
     Batch Header
     ChildRequest:
     [
         Child1 Packet Size, Child1 Header Length, Child1 Header, Child1 Body
         Child2 Packet Size, Child2 Header Length, Child2 Header, Child2 Body
         Child3 Packet Size, Child3 Header Length, Child3 Header, Child3 Body
     ]
   
   3. Indicate the importance of this issue to you (blocker, must-have, 
should-have, nice-to-have). Are you currently using any workarounds to address 
this issue?
   
   5. If there are some sub-tasks involved, use -[] for each sub-task and 
create a corresponding issue to map to the sub-task:
   
   Chinese version:
   
   1.背景
   在目前 RocketMQ API 的设计中,都是面向单个对象的 CRUD。比如,SEND_MESSAGE 请求一次 RPC 只能发送一个 (Topic, 
Queue) 的消息,即使是在实现了 Batch 的情况下,也只能发送某一个 (Topic, Queue) 的一个批消息,无法一次性发送关于多个 
(Topic, Queue) 的请求。消息消费、位点提交等请求也是类似,一次只能操作一个对象。
   为了适应流计算场景中高吞吐的要求,现在要设计一套简单的方案来实现 
Batch-Protocol,使得参与流计算的核心接口都能一次性操作多个对象,提高接口效率。
   
   2.父子请求/响应 设计
   
   2.1 基本设计思路
   保持原先 Remoting 的协议不变,加入父子请求/响应的概念。
   
   一个父请求可以携带多个同一类型的子请求,Broker 
识别到时父请求时,将父请求进行解析,把内部的子请求解析出来,依次调用子请求来获取子响应,等数据可以返回了,把所有子响应再合并成一个父响应,一次性返回给客户端。
   举个例子:现在要发送 20 个 (Topic, Queue) 的消息,客户端将 20 个「子发送请求」合并成一个「父发送请求」,一次性发送给 
Broker,Broker 
收到了「父发送请求」后,依次异步处理,当所有的「子发送请求」都处理完毕(支持同步/异步刷盘)后,将所有的「子发送响应」合并成一个「父发送响应」发送给客户端。客户端依次进行处理。
   
   此过程原先需要 20 个 RPC 来回,现在只需要 1 个 RPC 来回。
   
   2.2 协议说明
   RocketMQ 的通讯协议是常见 Length-Field-Based-Protocol。
   
   本设计只是对原来 Remoting 协议的扩展,并不会去修改 Remoting 协议。
   
   在原生 RemotingCommand 上进行扩展,自定义子请求和子响应。
   
   改造后的 请求和响应大致 可以分成 4 个部分:「Packet Size, Header Length, Header, Body」,其中 Packet 
Size 决定 4 个部分总共的长度。整个数据包的内容由 RemotingCommand 解析而来。
   
   父请求 BatchRequest.body 的内容是由「多个 ChildRequest」 序列化后「平铺」在 Body (byte[]) 中:
   
   BatchRequest:
     Batch Packet Size
     Batch Header Length
     Batch Header
     ChildRequest:
     [
         Child1 Packet Size, Child1 Header Length, Child1 Header, Child1 Body
         Child2 Packet Size, Child2 Header Length, Child2 Header, Child2 Body
         Child3 Packet Size, Child3 Header Length, Child3 Header, Child3 Body
     ]
   
   
   平铺在 byte[] 中的多个子请求可以根据 Length-Field-Based 协议的特点进行分割。
   根据 Child1 Packet Size 指定的长度,探测出整个「Child1 Header Length, Child1 Header, 
Child1 Body」部分,移动游标,接着探测 Child2,最后依次将 Child1, Child2, Child3 按序解析出来。
   
   父子响应 BatchResponse 的嵌套结构与父子请求一致。
   
   2.3 构造请求
   一个父请求中有多个子请求,例如 BatchRequest 中有多个 ChildRequest,分别拉取 (topic1, queue0) 、 
(topic1, queue1) , (topic2, queue0), (topic2, queue1) 的消息。
   
   2.4 处理请求
   在 Processor 处理时 BatchRequest 时,识别到它是一个Batch请求,会根据 byte[] BatchRequest.body 
字段将子请求解析出来,生成一个子请求列表 childRequests,这个过程叫做 ParseChildRequests。获取到了 childRequests 
后,遍历列表,分别为每一个 ChildRequest 去拉取对应的消息,并且为每一个ChildRequest 产生对应的 ChildResponse。
   
   2.5 构造响应
   处理完成多个子请求后,生成子响应列表 List<RemotingCommand> childResponses。在 Batch Protocol 
下,一个父请求过来,最后也只能有一个父响应 BatchResponse 回去,所以需要将 childRequests 全部序列化到 
BatchResponse.body 中,这个过程叫 mergeChildResponses,再通过 Netty 将 BatchResponse 通过 TCP 
返回给客户端。
   
   2.6 处理响应
   客户端收到 BatchResponse 后, 与 ParseChildRequests 类似,客户端需要做 ParseChildReqponses 
,根据 byte[] BatchResponse.body 字段将子响应解析成 childResponses 列表。然后根据具体的业务类型,对每一个 
(childRequest, childResponse) 进行处理。
   
   5.合并策略
   4.1 基础合并策略
   该策略很好描述,当所有的「异步子响应」都完成时,将所有的结果合并到「父响应」中。该策略适用于除了 Pull 消息的所有请求类型。
   
   举个例子,同步刷盘场景,Broker 同时处理 20 个「子发送请求」,所以会产生 20 个 Future 对象分别代表每一个子请求的异步结果。只有当 
20 个Future 都完成后(都被 刷盘线程 唤醒后),才能将 20 个 Future 的结果合并成一个「父响应」返回给客户端。
   
   3.2 拉取策略
   拉取策略只在 Pull 消息 的时候用。
   
   Pull 消息的语义会比Send 消息复杂一些,试想,Broker 同时处理 20 个「子拉取请求」,但是 20 个「子拉取请求」中有 18 
个已经拉取到了数据,但是有 2 个「子拉取请求」拉不到数据,进入了「长轮询」。如果使用「基础合并策略」来处理 Pull 消息的语义,那么整个请求都会去等待那 
2 个长轮询的响应。这显然和我们想要的不一致,我们希望 Pull 消息尽量是及时的,只要「任何」「子拉取请求」有数据,或者达到了一定的量,Broker 
都需要将结果返回,不需要等待个别几个进入了长轮询的「子拉取请求」。
   
   拉取中另一个核心点是零拷贝。
   对于单个响应 Response ,零拷贝是将响应的 4 个部分分成两组:
   拷贝组:Packet Size, Header Length, Header  
   零拷贝组:Body
   其中拷贝组使用的是 JVM 的 Heap Buffer,而零拷贝组使用的是 Memory map。
   
   对于一个父响应 BatchResponse,实现零拷贝的情况就要复杂些。原因是 BatchResponse.body 的内容是由 
ChildResponse 序列化后平铺在 byte[] 中,而 ChildResponse 中包含一些内存是不在 JVM Heap 的。
   响应大致可以被分离成:
   
   Batch Packet Size, 
   Batch Header Length, 
   Batch Header, 
   [ 
       Child1 Packet Size, Child1 Header Length, Child1 Header, Child1 Body, 
       Child2 Packet Size, Child2 Header Length, Child2 Header, Child2 Body, 
       Child3 Packet Size, Child3 Header Length, Child3 Header, Child3 Body 
   ]
   
   其中 Child1 Body,Child2 Body 和 Child3 Body,走的是 Zero Copy,其余所有部分走的是 Heap。
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to