It seems that there is something wrong with the email format of QQ mailbox. You
can jump to GitHub:https://github.com/apache/rocketmq/issues/2937 to get a
better reading experience
------------------ ???????? ------------------
??????:
"dev"
<[email protected]>;
????????: 2021??6??5??(??????) ????5:02
??????: "dev"<[email protected]>;
????: RIP 22 RocketMQ Stage Message
RIP 22 RocketMQ Stage Message
Status
* Current State: Proposed
* Authors: [email protected]
* Shepherds: [email protected]
* Mailing List discussion: [email protected]
* Pull Request: #PR_NUMBER
* Released: 4.9.0
Background &amp; Motivation
What do we need to do
* Will we add a new module?
&nbsp; No
* Will we add new APIs?
&nbsp; Yes
* Will we add new feature?
&nbsp; Yes
Why should we do that
* What can we benefit proposed changes?
EN
&nbsp; Let me give an example to illustrate the waste of performance caused
by `MessageListenerOrderly`. For example, in the e-commerce scenario, we use
`MessageListenerOrderly` to consume the order MQ. The first 100 consumed MQ
(the "consumed MQ" hereinafter referred to as "MQ") can get different extra
rewards, while the MQ after 100 can get no extra rewards. At this point,
`MessageListenerOrderly` can guarantee the order of the first 100 MQS and the
order of the after 100 MQS, but it is meaningless to guarantee the order of the
after 100 MQS. Therefore, for MQ after 100, we should use
`MessageListenerConcurrently` to consume concurrently, so as to improve the
performance.
&nbsp; Furthermore, suppose that the first 10 MQS can get an additional
laptop (called "stage 1"), the first 10-30 MQS can get an additional tablet
(called "stage 2"), the first 30-100 MQS can get an additional mobile phone
(called "stage 3"), and the after 100 MQS are called "stage 4". Because the
reward of each stage is the same, we can use `MessageListenerConcurrently` to
consume concurrently in the stages (for example, 20 MQ concurrent consumptions
in stage 2). We only need to ensure the order between stages (only when all MQS
in stage 1 is consumed can MQS in stage 2 be consumed).
&nbsp; Hard to understand? It doesn't matter. Let's give a more specific
example. Suppose that consuming one MQ takes 1s, single read queue and thread
pool size of `MessageListenerConcurrently` is 20, then for this example,
consuming 100 MQ using `MessageListenerOrderly` takes `1 * 100 = 100s` The time
required to consume 100 MQS by using `MessageListenerConcurrently` periodically
is as follows:
&nbsp;&nbsp; Stage 1 (1-10): 10 / 20 ?? 1s
&nbsp;&nbsp; Stage 2 (11-30): 20 / 20 ?? 1s
&nbsp;&nbsp; Stage 3 (31-100): 70 / 20 ?? 4S
&nbsp;&nbsp; Total time: 1 + 1 + 4 = 6S
&nbsp;&nbsp; Compared with 100s, it is ten times faster!
&nbsp;&nbsp; To sum up, I propose `MessageListenerStagedConcurrently`,
which can ensure the sequence of phases and concurrent consumption within
phases. Compared with `MessageListenerOrderly`, its performance is greatly
improved; Compared with `MessageListenerConcurrently`, it ensures the necessary
order. Of course, this idea of `staged concurrency` can also be used in
producer, brush disk and other places, and please free your imagination.
CN
&nbsp;&nbsp;
????????????????????????`MessageListenerOrderly`??????????????????????????????????????`MessageListenerOrderly`??????????MQ????100??????????MQ(??????????MQ????????????MQ??)????????????????????????100????????MQ??????????????????`MessageListenerOrderly`??????????100??MQ????????????????100????????MQ????????????????100????????MQ??????????????????????????????100????????MQ????????`MessageListenerConcurrently`??????????????????????????
&nbsp;&nbsp;
????????????????????10??MQ??????????????????????????(??????????1??)????10-30??MQ????????????????????????(??????????2??)????30-100??MQ????????????????????(??????????3??)??100????????MQ??????????4??????????????????????????????????????????????????`MessageListenerConcurrently`????????????(????????2??20??MQ????????)????????????????????????????????(????1??MQ????????????????2??MQ????????????)??
&nbsp;&nbsp;
?????????????????????????????????????????????????1??MQ????????1s????????????`MessageListenerConcurrently`????????????????20????????????????????????????`MessageListenerOrderly`????100??MQ????????`1*100=100s`??????????????????`MessageListenerConcurrently`????100??MQ????????????????
&nbsp;&nbsp; ????1(1-10)??10 / 20 ?? 1s
&nbsp;&nbsp; ????2(11-30)??20 / 20 ?? 1s
&nbsp;&nbsp; ????3(31-100)??70 / 20 ?? 4s
&nbsp;&nbsp; ????????1 + 1 + 4 = 6s
&nbsp;&nbsp; ????100s????????????????
&nbsp;&nbsp;
????????????????`MessageListenerStagedConcurrently`??????????????????????????????????????????????????????`MessageListenerOrderly`????????????????????????????`MessageListenerConcurrently`??????????????????????????????????`??????????`??????????????????producer??????????????????????????????????????????????
Goals
&nbsp;&nbsp; EN: Greatly improve the performance of order-message.
&nbsp;&nbsp; CN: ??????????????????????????
Non-Goals
* What problem is this proposal NOT designed to solve?
&nbsp; EN: Not change the original order-message interface.
&nbsp; CN: ????????????????????????????????
* Are there any limits of this proposal?
EN
&nbsp; 1.In order to maximize the performance of order-message, users need
to define stages reasonably.
&nbsp; 2.Just like `MessageListenerOrderly`, the
`MessageListenerStagedConcurrently` only ensures that each queue (partition) is
orderly.
CN
&nbsp; 1.??????????????????????????????????????????????????????????
&nbsp;
2.??`MessageListenerOrderly`??????`MessageListenerStagedConcurrently`????????????????????????????????
Changes
&nbsp;&nbsp; We need add some codes in common, client and broker
component which include adding priority concurrency frameworks and consumer
choices.Read below sections to get more details about the Stage Message for
RocketMQ.
Interface Design/Change
```java
public interface MessageListenerStagedConcurrently extends MessageListener {
&nbsp;&nbsp;&nbsp; ConsumeOrderlyStatus consumeMessage(final
List<MessageExt&gt; msgs,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; final
ConsumeStagedConcurrentlyContext context);
&nbsp;&nbsp;&nbsp; /**
&nbsp;&nbsp;&nbsp;&nbsp; * If returns empty collection, {@link
MessageListenerStagedConcurrently} will degenerate into {@link
&nbsp;&nbsp;&nbsp;&nbsp; * MessageListenerConcurrently}; If
returns a collection whose elements are all 1, {@link
&nbsp;&nbsp;&nbsp;&nbsp; * MessageListenerStagedConcurrently}
will temporarily evolve into {@link MessageListenerOrderly};
&nbsp;&nbsp;&nbsp;&nbsp; */
&nbsp;&nbsp;&nbsp; List<Integer&gt; getStageDefinitions();
&nbsp;&nbsp;&nbsp; /**
&nbsp;&nbsp;&nbsp;&nbsp; * can be used to reset the current
stage by CAS
&nbsp;&nbsp;&nbsp;&nbsp; */
&nbsp;&nbsp;&nbsp; void resetCurrentStageOffsetIfNeed(final String
topic, final AtomicInteger currentStageOffset);
}
```
```java
/**
&nbsp;* Refer to {@link
org.apache.rocketmq.client.consumer.store.OffsetStore}, manage the stage
consumption progress.
&nbsp;*/
public interface StageOffsetStore {
&nbsp;&nbsp;&nbsp; void load() throws MQClientException;
&nbsp;&nbsp;&nbsp; void updateStageOffset(final MessageQueue mq,
final int stageOffset, final boolean increaseOnly);
&nbsp;&nbsp;&nbsp; int readStageOffset(final MessageQueue mq, final
ReadOffsetType type);
&nbsp;&nbsp;&nbsp; void persistAll(final Set<MessageQueue&gt;
mqs);
&nbsp;&nbsp;&nbsp; void persist(final MessageQueue mq);
&nbsp;&nbsp;&nbsp; void removeStageOffset(MessageQueue mq);
&nbsp;&nbsp;&nbsp; Map<MessageQueue, Integer&gt;
cloneStageOffsetTable(String topic);
&nbsp;&nbsp;&nbsp; void
updateConsumeStageOffsetToBroker(MessageQueue mq, int stageOffset, boolean
isOneway) throws RemotingException,
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
MQBrokerException, InterruptedException, MQClientException;
}
```
* Method behavior changes
&nbsp; Nothing specific.
* CLI command changes
&nbsp; Nothing specific.
* Log format or content changes
&nbsp; Nothing specific.
Compatibility, Deprecation, and Migration Plan
* Are backward and forward compatibility taken into consideration?
&nbsp; EN: Not change the original order-message interface.
&nbsp; CN: ??????????????????????????????????
* Are there deprecated APIs?
&nbsp; Nothing specific.
* How do we do migration?
&nbsp; Nothing specific.
Implementation Outline
We will implement the proposed changes by 3 phases.
Phase 1
&nbsp;&nbsp; Implement Stage Message feature in Consumer
Phase 2
&nbsp;&nbsp; Implement Stage Message feature in Producer(To be honest,
I haven't thought about it yet. I'm looking forward to your idea, ha ha)
Phase 3
&nbsp;&nbsp; Implement Stage Message feature in Broker(Looking forward
to your idea, too)
Rejected Alternatives
* How does alternatives solve the issue you proposed?
&nbsp; Nothing specific.
* Pros and Cons of alternatives
&nbsp; Nothing specific.
* Why should we reject above alternatives