EN
In the actual business scenario, there are at least three types of businesses 
at the same time
1. There are activities, and the stage definition is 1-10, 11-30, 30-50, 50 +;
2. There are activities, and the stage definition is 1-20, 21-100, 100 +;
3. There is no activity, and there is no need for periodic merging;
......
In order to meet the actual complex business scenarios, I redesigned 
`MessageListenerStagedConcurrently` and 
`StageOffsetStore`,`getStageDefinitionStrategies` can now define multiple 
policies (such as the different definitions of the above 1 and 2 types of 
businesses), and support dynamic configuration. A new `computeStrategy` is 
added to let users choose a policy, A new `computeGroup` is added to 
distinguish different businesses in the same definition (such as category 1 
above) (the ID of businessman a is `a` and that of businessman B is `b`).
You can jump to GitHub:https://github.com/apache/rocketmq/issues/2937 to get a 
better reading experience.


CN
????????????????????????????????3????????
1.??????????????????1-10??11-30??30-50??50+??
2.??????????????????1-20??21-100??100+??
3.??????????????????????????
......
????????????????????????????????????????`MessageListenerStagedConcurrently`??`StageOffsetStore`??`getStageDefinitionStrategies`??????????????????????(????????1??2????????????????)??????????????????????????`computeStrategy`????????????????????????`computeGroup`??????????????(??????????1??)????????????(????a??id??`a`??????b??id??`b`)??
????????????github??https://github.com/apache/rocketmq/issues/2937 
??????????????????????





------------------ ???????? ------------------
??????:                                                                         
                                               "dev"                            
                                                        <[email protected]&gt;;
????????:&nbsp;2021??6??5??(??????) ????5:47
??????:&nbsp;"dev"<[email protected]&gt;;

????:&nbsp;[DISCUSS] RIP-22 RocketMQ Stage Message



Hi RocketMQ Community,
This is the discuss for the kickoff of RIP-22 RocketMQ Stage Message.

You can jump to GitHub:https://github.com/apache/rocketmq/issues/2937 to get a 
better reading experience.
This discussion will continue until 0:00 on June 13, 2021.
Looking forward to your reply.





------------------&amp;nbsp;????????&amp;nbsp;------------------
??????:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 
"dev"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 <[email protected]&amp;gt;;
????????:&amp;nbsp;2021??6??5??(??????) ????5:26
??????:&amp;nbsp;"dev"<[email protected]&amp;gt;;

????:&amp;nbsp;??????RIP 22 RocketMQ Stage Message



It seems that there is something wrong with the email format of QQ mailbox. You 
can jump to GitHub:https://github.com/apache/rocketmq/issues/2937 to get a 
better reading experience&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 





------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------
??????:&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 
"dev"&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 <[email protected]&amp;amp;gt;;
????????:&amp;amp;nbsp;2021??6??5??(??????) ????5:02
??????:&amp;amp;nbsp;"dev"<[email protected]&amp;amp;gt;;

????:&amp;amp;nbsp;RIP 22 RocketMQ Stage Message



RIP 22 RocketMQ Stage Message

Status
* Current State: Proposed
* Authors: [email protected]
* Shepherds: [email protected]
* Mailing List discussion: [email protected]
* Pull Request: #PR_NUMBER
* Released: 4.9.0

Background &amp;amp;amp;amp; Motivation
What do we need to do
* Will we add a new module?
&amp;amp;amp;nbsp; No
* Will we add new APIs?
&amp;amp;amp;nbsp; Yes
* Will we add new feature?
&amp;amp;amp;nbsp; Yes
Why should we do that
* What can we benefit proposed changes?
EN
&amp;amp;amp;nbsp; Let me give an example to illustrate the waste of 
performance caused by `MessageListenerOrderly`. For example, in the e-commerce 
scenario, we use `MessageListenerOrderly` to consume the order MQ. The first 
100 consumed MQ (the "consumed MQ" hereinafter referred to as "MQ") can get 
different extra rewards, while the MQ after 100 can get no extra rewards. At 
this point, `MessageListenerOrderly` can guarantee the order of the first 100 
MQS and the order of the after 100 MQS, but it is meaningless to guarantee the 
order of the after 100 MQS. Therefore, for MQ after 100, we should use 
`MessageListenerConcurrently` to consume concurrently, so as to improve the 
performance.
&amp;amp;amp;nbsp; Furthermore, suppose that the first 10 MQS can get an 
additional laptop (called "stage 1"), the first 10-30 MQS can get an additional 
tablet (called "stage 2"), the first 30-100 MQS can get an additional mobile 
phone (called "stage 3"), and the after 100 MQS are called "stage 4". Because 
the reward of each stage is the same, we can use `MessageListenerConcurrently` 
to consume concurrently in the stages (for example, 20 MQ concurrent 
consumptions in stage 2). We only need to ensure the order between stages (only 
when all MQS in stage 1 is consumed can MQS in stage 2 be consumed).
&amp;amp;amp;nbsp; Hard to understand? It doesn't matter. Let's give a more 
specific example. Suppose that consuming one MQ takes 1s, single read queue and 
thread pool size of `MessageListenerConcurrently` is 20, then for this example, 
consuming 100 MQ using `MessageListenerOrderly` takes `1 * 100 = 100s` The time 
required to consume 100 MQS by using `MessageListenerConcurrently` periodically 
is as follows:
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Stage 1 (1-10): 10 / 20 ?? 1s
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Stage 2 (11-30): 20 / 20 ?? 1s
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Stage 3 (31-100): 70 / 20 ?? 4S
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Total time: 1 + 1 + 4 = 6S
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Compared with 100s, it is ten times faster!
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; To sum up, I propose 
`MessageListenerStagedConcurrently`, which can ensure the sequence of phases 
and concurrent consumption within phases. Compared with 
`MessageListenerOrderly`, its performance is greatly improved; Compared with 
`MessageListenerConcurrently`, it ensures the necessary order. Of course, this 
idea of `staged concurrency` can also be used in producer, brush disk and other 
places, and please free your imagination.
CN
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
????????????????????????`MessageListenerOrderly`??????????????????????????????????????`MessageListenerOrderly`??????????MQ????100??????????MQ(??????????MQ????????????MQ??)????????????????????????100????????MQ??????????????????`MessageListenerOrderly`??????????100??MQ????????????????100????????MQ????????????????100????????MQ??????????????????????????????100????????MQ????????`MessageListenerConcurrently`??????????????????????????
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
????????????????????10??MQ??????????????????????????(??????????1??)????10-30??MQ????????????????????????(??????????2??)????30-100??MQ????????????????????(??????????3??)??100????????MQ??????????4??????????????????????????????????????????????????`MessageListenerConcurrently`????????????(????????2??20??MQ????????)????????????????????????????????(????1??MQ????????????????2??MQ????????????)??
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
?????????????????????????????????????????????????1??MQ????????1s????????????`MessageListenerConcurrently`????????????????20????????????????????????????`MessageListenerOrderly`????100??MQ????????`1*100=100s`??????????????????`MessageListenerConcurrently`????100??MQ????????????????
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ????1(1-10)??10 / 20 ?? 1s
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ????2(11-30)??20 / 20 ?? 1s
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ????3(31-100)??70 / 20 ?? 4s
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ????????1 + 1 + 4 = 6s
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ????100s????????????????
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
????????????????`MessageListenerStagedConcurrently`??????????????????????????????????????????????????????`MessageListenerOrderly`????????????????????????????`MessageListenerConcurrently`??????????????????????????????????`??????????`??????????????????producer??????????????????????????????????????????????

Goals
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; EN: Greatly improve the performance of 
order-message.
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; CN: ??????????????????????????

Non-Goals
* What problem is this proposal NOT designed to solve?
&amp;amp;amp;nbsp; EN: Not change the original order-message interface.
&amp;amp;amp;nbsp; CN: ????????????????????????????????
* Are there any limits of this proposal?
EN
&amp;amp;amp;nbsp; 1.In order to maximize the performance of order-message, 
users need to define stages reasonably.
&amp;amp;amp;nbsp; 2.Just like `MessageListenerOrderly`, the 
`MessageListenerStagedConcurrently` only ensures that each queue (partition) is 
orderly.
CN
&amp;amp;amp;nbsp; 1.??????????????????????????????????????????????????????????
&amp;amp;amp;nbsp; 
2.??`MessageListenerOrderly`??????`MessageListenerStagedConcurrently`????????????????????????????????

Changes
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; We need add some codes in common, client 
and broker component which include adding priority concurrency frameworks and 
consumer choices.Read below sections to get more details about the Stage 
Message for RocketMQ.

Interface Design/Change
```java
public interface MessageListenerStagedConcurrently extends MessageListener {

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; ConsumeOrderlyStatus 
consumeMessage(final List<MessageExt&amp;amp;amp;gt; msgs,
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 final ConsumeStagedConcurrentlyContext context);

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; /**
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; * If 
returns empty collection, {@link MessageListenerStagedConcurrently} will 
degenerate into {@link
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; * 
MessageListenerConcurrently}; If returns a collection whose elements are all 1, 
{@link
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; * 
MessageListenerStagedConcurrently} will temporarily evolve into {@link 
MessageListenerOrderly};
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; */
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; 
List<Integer&amp;amp;amp;gt; getStageDefinitions();

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; /**
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; * can 
be used to reset the current stage by CAS
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; */
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; void 
resetCurrentStageOffsetIfNeed(final String topic, final AtomicInteger 
currentStageOffset);
}
```
```java
/**
&amp;amp;amp;nbsp;* Refer to {@link 
org.apache.rocketmq.client.consumer.store.OffsetStore}, manage the stage 
consumption progress.
&amp;amp;amp;nbsp;*/
public interface StageOffsetStore {

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; void load() throws 
MQClientException;

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; void 
updateStageOffset(final MessageQueue mq, final int stageOffset, final boolean 
increaseOnly);

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; int 
readStageOffset(final MessageQueue mq, final ReadOffsetType type);

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; void persistAll(final 
Set<MessageQueue&amp;amp;amp;gt; mqs);

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; void persist(final 
MessageQueue mq);

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; void 
removeStageOffset(MessageQueue mq);

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Map<MessageQueue, 
Integer&amp;amp;amp;gt; cloneStageOffsetTable(String topic);

&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; void 
updateConsumeStageOffsetToBroker(MessageQueue mq, int stageOffset, boolean 
isOneway) throws RemotingException,
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;&amp;amp;amp;nbsp;
 MQBrokerException, InterruptedException, MQClientException;
}
```
* Method behavior changes
&amp;amp;amp;nbsp; Nothing specific.
* CLI command changes
&amp;amp;amp;nbsp; Nothing specific.
* Log format or content changes
&amp;amp;amp;nbsp; Nothing specific.

Compatibility, Deprecation, and Migration Plan
* Are backward and forward compatibility taken into consideration?
&amp;amp;amp;nbsp; EN: Not change the original order-message interface.
&amp;amp;amp;nbsp; CN: ??????????????????????????????????
* Are there deprecated APIs?
&amp;amp;amp;nbsp; Nothing specific.
* How do we do migration?
&amp;amp;amp;nbsp; Nothing specific.

Implementation Outline
We will implement the proposed changes by 3 phases. 
Phase 1
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Implement Stage Message feature in Consumer
Phase 2
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Implement Stage Message feature in 
Producer(To be honest, I haven't thought about it yet. I'm looking forward to 
your idea, ha ha)
Phase 3
&amp;amp;amp;nbsp;&amp;amp;amp;nbsp; Implement Stage Message feature in 
Broker(Looking forward to your idea, too)

Rejected Alternatives 
* How does alternatives solve the issue you proposed?
&amp;amp;amp;nbsp; Nothing specific.
* Pros and Cons of alternatives
&amp;amp;amp;nbsp; Nothing specific.
* Why should we reject above alternatives

Reply via email to