This is an automated email from the ASF dual-hosted git repository.
lwclover pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f8bf7bd38 Fixed improper punctuation in Chinese documents. (#5031)
f8bf7bd38 is described below
commit f8bf7bd38b11ca025c5795a8e4a03bd577f47273
Author: zhaows <[email protected]>
AuthorDate: Fri Sep 9 09:14:15 2022 +0800
Fixed improper punctuation in Chinese documents. (#5031)
Change the punctuation to Chinese.
---
docs/cn/BrokerContainer.md | 4 +-
docs/cn/Configuration_TLS.md | 8 +-
docs/cn/Deployment.md | 14 ++--
docs/cn/QuorumACK.md | 2 +-
docs/cn/README.md | 2 +-
docs/cn/SlaveActingMasterMode.md | 6 +-
docs/cn/architecture.md | 2 +-
docs/cn/best_practice.md | 4 +-
.../java/API_Reference_ DefaultPullConsumer.md | 62 +++++++-------
docs/cn/concept.md | 2 +-
docs/cn/controller/design.md | 98 +++++++++++-----------
docs/cn/controller/quick_start.md | 18 ++--
docs/cn/design.md | 6 +-
docs/cn/dledger/deploy_guide.md | 2 +-
docs/cn/features.md | 6 +-
docs/cn/msg_trace/user_guide.md | 2 +-
docs/cn/operation.md | 6 +-
..._Topic_Logic_Queue_\350\256\276\350\256\241.md" | 8 +-
docs/cn/statictopic/The_Scope_Of_Static_Topic.md | 10 +--
19 files changed, 131 insertions(+), 131 deletions(-)
diff --git a/docs/cn/BrokerContainer.md b/docs/cn/BrokerContainer.md
index c641014a9..3e741e230 100644
--- a/docs/cn/BrokerContainer.md
+++ b/docs/cn/BrokerContainer.md
@@ -96,7 +96,7 @@ usage: mqadmin removeBroker -b <arg> -c <arg> [-h] [-n <arg>]
## 存储变化
-storePathRootDir,
storePathCommitLog路径依然为MessageStoreConfig中配置值,需要注意的是同一个brokerContainer中的broker不能使用相同的storePathRootDir,
storePathCommitLog,否则不同的broker占用同一个存储目录,发生数据混乱。
+storePathRootDir,storePathCommitLog路径依然为MessageStoreConfig中配置值,需要注意的是同一个brokerContainer中的broker不能使用相同的storePathRootDir,storePathCommitLog,否则不同的broker占用同一个存储目录,发生数据混乱。
在文件删除策略上,仍然单个Broker的视角来进行删除,但MessageStoreConfig新增replicasPerDiskPartition参数和logicalDiskSpaceCleanForciblyThreshold。
@@ -149,4 +149,4 @@
InnerLoggerFactory.brokerIdentity.set(brokerIdentity.getCanonicalName())
通过线程名和线程本地变量区分可以参考org.apache.rocketmq.common.AbstractBrokerRunnable、org.apache.rocketmq.common.ThreadFactoryImpl以及各个ServiceThread中getServiceName的实现。
-参考文档:
[原RIP](https://github.com/apache/rocketmq/wiki/RIP-31-Support-RocketMQ-BrokerContainer)
\ No newline at end of file
+参考文档:[原RIP](https://github.com/apache/rocketmq/wiki/RIP-31-Support-RocketMQ-BrokerContainer)
\ No newline at end of file
diff --git a/docs/cn/Configuration_TLS.md b/docs/cn/Configuration_TLS.md
index 18a109b79..9ff03e53a 100644
--- a/docs/cn/Configuration_TLS.md
+++ b/docs/cn/Configuration_TLS.md
@@ -73,12 +73,12 @@ tls.client.trustCertPath=/opt/certFiles/ca.pem
编辑rocketmq/bin路径下的配置文件,使tls.properties配置生效.-Dtls.config.file的值需要替换为步骤2中创建的tls.peoperties文件的路径
-### 3.1 编辑runserver.sh,,在JAVA_OPT中增加以下内容:
+### 3.1 编辑runserver.sh,在JAVA_OPT中增加以下内容:
```shell
JAVA_OPT="${JAVA_OPT} -Dtls.server.mode=enforcing
-Dtls.config.file=/opt/rocketmq-4.9.3/conf/tls.properties"
```
-### 3.2 编辑runbroker.sh,在JAVA_OPT中增加以下内容:
+### 3.2 编辑runbroker.sh,在JAVA_OPT中增加以下内容:
```shell
JAVA_OPT="${JAVA_OPT} -Dorg.apache.rocketmq.remoting.ssl.mode=enforcing
-Dtls.config.file=/opt/rocketmq-4.9.3/conf/tls.properties -Dtls.enable=true"
@@ -86,7 +86,7 @@ JAVA_OPT="${JAVA_OPT}
-Dorg.apache.rocketmq.remoting.ssl.mode=enforcing -Dtls.co
# 4 客户端连接
-创建客户端使用的tlsclient.properties,并加入以下内容:
+创建客户端使用的tlsclient.properties,并加入以下内容:
```properties
# The store path of client-side private key
tls.client.keyPath=/opt/certFiles/client.key
@@ -103,7 +103,7 @@ JVM中需要加以下参数.tls.config.file的值需要使用之前创建的文
-Dtls.client.authServer=true -Dtls.enable=true -Dtls.test.mode.enable=false
-Dtls.config.file=/opt/certs/tlsclient.properties
```
-在客户端连接的代码中,需要将setUseTLS设置为true:
+在客户端连接的代码中,需要将setUseTLS设置为true:
```java
public class ExampleProducer {
public static void main(String[] args) throws Exception {
diff --git a/docs/cn/Deployment.md b/docs/cn/Deployment.md
index 5224e7eab..14529d111 100644
--- a/docs/cn/Deployment.md
+++ b/docs/cn/Deployment.md
@@ -25,7 +25,7 @@ The Name Server boot success...
### 第一步先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 &
-### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
+### 验证broker是否启动成功,比如,broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a,192.169.1.2:10911] boot success...
```
@@ -36,12 +36,12 @@ The broker[broker-a,192.169.1.2:10911] boot success...
该模式是指所有节点都是master主节点(比如2个或3个主节点),没有slave从节点的模式。 这种模式的优缺点如下:
-- 优点:
+- 优点:
1. 配置简单。
2. 一个master节点的宕机或者重启(维护)对应用程序没有影响。
3.
当磁盘配置为RAID10时,消息不会丢失,因为RAID10磁盘非常可靠,即使机器不可恢复(消息异步刷盘模式的情况下,会丢失少量消息;如果消息是同步刷盘模式,不会丢失任何消息)。
4. 在这种模式下,性能是最高的。
-- 缺点:
+- 缺点:
1. 单台机器宕机时,本机未消费的消息,直到机器恢复后才会订阅,影响消息实时性。
多Master模式的启动步骤如下:
@@ -75,11 +75,11 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-noslave/broker
每个主节点配置多个从节点,多对主从。HA采用异步复制,主节点和从节点之间有短消息延迟(毫秒)。这种模式的优缺点如下:
-- 优点:
+- 优点:
1. 即使磁盘损坏,也只会丢失极少的消息,不影响消息的实时性能。
2. 同时,当主节点宕机时,消费者仍然可以消费从节点的消息,这个过程对应用本身是透明的,不需要人为干预。
3. 性能几乎与多Master模式一样高。
-- 缺点:
+- 缺点:
1. 主节点宕机、磁盘损坏时,会丢失少量消息。
多主多从模式的启动步骤如下:
@@ -119,11 +119,11 @@ $ nohup sh mqbroker -n 192.168.1.1:9876 -c
$ROCKETMQ_HOME/conf/2m-2s-async/broke
这种模式的优缺点如下:
-- 优点:
+- 优点:
1. 数据和服务都没有单点故障。
2. 在master节点关闭的情况下,消息也没有延迟。
3. 服务可用性和数据可用性非常高;
-- 缺点:
+- 缺点:
1. 这种模式下的性能略低于异步复制模式(大约低 10%)。
2. 发送单条消息的RT略高,目前版本,master节点宕机后,slave节点无法自动切换到master。
diff --git a/docs/cn/QuorumACK.md b/docs/cn/QuorumACK.md
index 4df517e85..cd97c92ec 100644
--- a/docs/cn/QuorumACK.md
+++ b/docs/cn/QuorumACK.md
@@ -69,4 +69,4 @@ private int calcNeedAckNums(int inSyncReplicas) {
** 用户需要设置正确的参数才能完成正确的向后兼容。举个例子,假设用户原集群为两副本同步复制,在没有修改任何参数的情况下,升级到RocketMQ
5的版本,由于totalReplicas、inSyncReplicas默认都为1,将降级为异步复制,如果需要和以前行为保持一致,则需要将totalReplicas和inSyncReplicas均设置为2。**
-参考文档:
[原RIP](https://github.com/apache/rocketmq/wiki/RIP-34-Support-quorum-write-and-adaptive-degradation-in-master-slave-architecture)
\ No newline at end of file
+参考文档:[原RIP](https://github.com/apache/rocketmq/wiki/RIP-34-Support-quorum-write-and-adaptive-degradation-in-master-slave-architecture)
\ No newline at end of file
diff --git a/docs/cn/README.md b/docs/cn/README.md
index 13ca985ae..a1b443ad3 100644
--- a/docs/cn/README.md
+++ b/docs/cn/README.md
@@ -1,7 +1,7 @@
Apache RocketMQ开发者指南
--------
-##### 这个开发者指南是帮助您快速了解,并使用 Apache RocketMQ
+##### 这个开发者指南是帮助您快速了解,并使用 Apache RocketMQ
### 1. 概念和特性
diff --git a/docs/cn/SlaveActingMasterMode.md b/docs/cn/SlaveActingMasterMode.md
index 7af7ff89f..b1e266f2b 100644
--- a/docs/cn/SlaveActingMasterMode.md
+++ b/docs/cn/SlaveActingMasterMode.md
@@ -14,7 +14,7 @@
- earliestMsgStoreTime
- endTransaction
-所有锁MQ相关操作,包括lock, unlock, lockBatch, unlockAll
+所有锁MQ相关操作,包括lock,unlock,lockBatch,unlockAll
具体影响为:
- 客户端无法获取位于该副本组的mq的锁,故当本地锁过期后,将无法消费该组的顺序消息
@@ -31,7 +31,7 @@
提出一个新的方案,Slave代理Master模式,作为Master-Slave部署模式的升级。在原先Master-Slave部署模式下,通过备代理主、轻量级心跳、副本组信息获取、broker预上线机制、二级消息逃逸等方式,当同组Master发生故障时,Slave将承担更加重要的作用,包括:
-- 当Master下线后,该组中brokerId最小的Slave会承担备读 以及 一些 客户端和管控会访问
但却只能在Master节点上完成的任务。包括且不限于searchOffset、maxOffset、minOffset、earliestMsgStoreTime、endTransaction以及所有锁MQ相关操作lock,
unlock, lockBatch, unlockAll。
+- 当Master下线后,该组中brokerId最小的Slave会承担备读 以及 一些 客户端和管控会访问
但却只能在Master节点上完成的任务。包括且不限于searchOffset、maxOffset、minOffset、earliestMsgStoreTime、endTransaction以及所有锁MQ相关操作lock,unlock,lockBatch,unlockAll。
-
当Master下线后,故障Broker组上的二级消息消费将不会中断,由该组中该组中brokerId最小的Slave承担起该任务,定时消息、Pop消息、事务消息等仍然可以正常运行。
-
当Master下线后,在Slave代理Master一段时间主后,然后当Master再次上线后,通过预上线机制,Master会自动完成元数据的反向同步后再上线,不会出现元数据回退,造成消息大量重复消费或二级消息大量重放。
@@ -161,4 +161,4 @@ Broker
客户端对新旧版本的nameserver和broker均无兼容性问题。
-参考文档:
[原RIP](https://github.com/apache/rocketmq/wiki/RIP-32-Slave-Acting-Master-Mode)
\ No newline at end of file
+参考文档:[原RIP](https://github.com/apache/rocketmq/wiki/RIP-32-Slave-Acting-Master-Mode)
\ No newline at end of file
diff --git a/docs/cn/architecture.md b/docs/cn/architecture.md
index cbb896b11..87e93d183 100644
--- a/docs/cn/architecture.md
+++ b/docs/cn/architecture.md
@@ -3,7 +3,7 @@
## 1 技术架构

-RocketMQ架构上主要分为四部分,如上图所示:
+RocketMQ架构上主要分为四部分,如上图所示:
-
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
diff --git a/docs/cn/best_practice.md b/docs/cn/best_practice.md
index f5c426ff1..5cc5b3764 100755
--- a/docs/cn/best_practice.md
+++ b/docs/cn/best_practice.md
@@ -185,7 +185,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
| brokerIP2 | 跟 brokerIP1 一样 | 存在主从 broker 时,如果在
broker 主节点上配置了 brokerIP2 属性,broker 从节点会连接主节点配置的 brokerIP2 进行同步 |
| brokerName | null | broker 的名称
|
| brokerClusterName | DefaultCluster | 本
broker 所属的 Cluster 名称 |
-| brokerId | 0 | broker id, 0 表示
master, 其他的正整数表示 slave |
+| brokerId | 0 | broker id,0 表示
master,其他的正整数表示 slave |
| storePathRootDir | $HOME/store/ |
存储根路径 |
| storePathCommitLog | $HOME/store/commitlog/
| 存储 commit log 的路径
|
| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小
|
@@ -209,7 +209,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
### 5.1 客户端寻址方式
-RocketMQ可以令客户端找到Name Server, 然后通过Name
Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。
+RocketMQ可以令客户端找到Name Server,然后通过Name
Server再找到Broker。如下所示有多种配置方式,优先级由高到低,高优先级会覆盖低优先级。
- 代码中指定Name Server地址,多个namesrv地址之间用分号分割
diff --git a/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md
b/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md
index 64a2b7bd8..66857e2fd 100644
--- a/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md
+++ b/docs/cn/client/java/API_Reference_ DefaultPullConsumer.md
@@ -4,11 +4,11 @@
1. `DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer`
-2.
`DefaultMQPullConsumer`主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。
+2.
`DefaultMQPullConsumer`主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,也可以自定义与控制offset位置。
-3.
优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。
+3.
优势:consumer可以按需消费,不用担心自己处理能力,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。
-4. 缺点:由于主动权在消费方,消费方无法及时获取最新的消息。比较适合不及时批处理场景。
+4. 缺点:由于主动权在消费方,消费方无法及时获取最新的消息。比较适合不及时批处理场景。
``` java
@@ -112,32 +112,32 @@ public class MQPullConsumer {
### 使用方法摘要
-|返回值|方法名称|方法描述|
-|-------|-------|------------|
-|MQAdmin接口method|-------|------------|
-|void|createTopic(String key, String newTopic, int
queueNum)|在broker上创建指定的topic|
-|void|createTopic(String key, String newTopic, int queueNum, int
topicSysFlag)|在broker上创建指定的topic|
-|long|earliestMsgStoreTime(MessageQueue mq)|查询最早的消息存储时间|
-|long|maxOffset(MessageQueue mq)|查询给定消息队列的最大offset|
-|long|minOffset(MessageQueue mq)|查询给定消息队列的最小offset|
-|QueryResult|queryMessage(String topic, String key, int maxNum, long begin,
long end)|按关键字查询消息|
-|long|searchOffset(MessageQueue mq, long timestamp)|查找指定时间的消息队列的物理offset|
-|MessageExt|viewMessage(String offsetMsgId)|根据给定的msgId查询消息|
-|MessageExt|public MessageExt viewMessage(String topic, String
msgId)|根据给定的msgId查询消息,并指定topic|
-|MQConsumer接口method|-------|------------|
-|Set<MessageQueue>|fetchSubscribeMessageQueues(String topic)|根据topic获取订阅的Queue|
-|void|sendMessageBack(final MessageExt msg, final int
delayLevel)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL|
-|void|sendMessageBack(final MessageExt msg, final int delayLevel, final String
brokerName)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL|
-|MQPullConsumer接口method|-------|------------|
-|long|fetchConsumeOffset(MessageQueue mq, boolean fromStore)|查询给定消息队列的最大offset|
-|PullResult |pull(final MessageQueue mq, final String subExpression, final
long offset,final int maxNums)|异步拉取制定匹配的消息|
-|PullResult| pull(final MessageQueue mq, final String subExpression, final
long offset,final int maxNums, final long timeout)|异步拉取制定匹配的消息|
-|PullResult|pull(final MessageQueue mq, final MessageSelector selector, final
long offset,final int
maxNums)|异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType|
-|PullResult|pullBlockIfNotFound(final MessageQueue mq, final String
subExpression,final long offset, final int
maxNums)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis|
-|void|pullBlockIfNotFound(final MessageQueue mq, final String subExpression,
final long offset,final int maxNums, final PullCallback
pullCallback)|异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费|
-|void|updateConsumeOffset(final MessageQueue mq, final long
offset)|更新指定mq的offset|
-|long|fetchMessageQueuesInBalance(String
topic)|根据topic获取订阅的Queue(是balance分配后的)|
-|void|void sendMessageBack(MessageExt msg, int delayLevel, String brokerName,
String
consumerGroup)|如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费|
-|void|shutdown()|关闭当前消费者实例并释放相关资源|
-|void|start()|启动消费者|
+|返回值|方法名称| 方法描述
|
+|-------|-------|----------------------------------------------------------------------------------------|
+|MQAdmin接口method|-------| ------------
|
+|void|createTopic(String key, String newTopic, int queueNum)|
在broker上创建指定的topic
|
+|void|createTopic(String key, String newTopic, int queueNum, int
topicSysFlag)| 在broker上创建指定的topic
|
+|long|earliestMsgStoreTime(MessageQueue mq)| 查询最早的消息存储时间
|
+|long|maxOffset(MessageQueue mq)| 查询给定消息队列的最大offset
|
+|long|minOffset(MessageQueue mq)| 查询给定消息队列的最小offset
|
+|QueryResult|queryMessage(String topic, String key, int maxNum, long begin,
long end)| 按关键字查询消息
|
+|long|searchOffset(MessageQueue mq, long timestamp)| 查找指定时间的消息队列的物理offset
|
+|MessageExt|viewMessage(String offsetMsgId)| 根据给定的msgId查询消息
|
+|MessageExt|public MessageExt viewMessage(String topic, String msgId)|
根据给定的msgId查询消息,并指定topic
|
+|MQConsumer接口method|-------| ------------
|
+|Set<MessageQueue>|fetchSubscribeMessageQueues(String topic)|
根据topic获取订阅的Queue
|
+|void|sendMessageBack(final MessageExt msg, final int delayLevel)|
如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL
|
+|void|sendMessageBack(final MessageExt msg, final int delayLevel, final String
brokerName)| 如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL
|
+|MQPullConsumer接口method|-------| ------------
|
+|long|fetchConsumeOffset(MessageQueue mq, boolean fromStore)|
查询给定消息队列的最大offset
|
+|PullResult |pull(final MessageQueue mq, final String subExpression, final
long offset,final int maxNums)| 异步拉取制定匹配的消息
|
+|PullResult| pull(final MessageQueue mq, final String subExpression, final
long offset,final int maxNums, final long timeout)| 异步拉取制定匹配的消息
|
+|PullResult|pull(final MessageQueue mq, final MessageSelector selector, final
long offset,final int maxNums)|
异步拉取制定匹配的消息,通过MessageSelector器来过滤消息,参考org.apache.rocketmq.common.filter.ExpressionType
|
+|PullResult|pullBlockIfNotFound(final MessageQueue mq, final String
subExpression,final long offset, final int maxNums)|
异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis
|
+|void|pullBlockIfNotFound(final MessageQueue mq, final String subExpression,
final long offset,final int maxNums, final PullCallback pullCallback)|
异步拉取制定匹配的消息,如果没有消息讲block住,并指定超时时间consumerPullTimeoutMillis,通过回调pullCallback来消费
|
+|void|updateConsumeOffset(final MessageQueue mq, final long offset)|
更新指定mq的offset
|
+|long|fetchMessageQueuesInBalance(String topic)|
根据topic获取订阅的Queue(是balance分配后的)
|
+|void|void sendMessageBack(MessageExt msg, int delayLevel, String brokerName,
String consumerGroup)|
如果消息出来失败,可以发送回去延迟消费,delayLevel=DelayConf.DELAY_LEVEL,消息可能在同一个consumerGroup消费
|
+|void|shutdown()| 关闭当前消费者实例并释放相关资源
|
+|void|start()| 启动消费者
|
diff --git a/docs/cn/concept.md b/docs/cn/concept.md
index 9719cdd02..cb2c863bd 100644
--- a/docs/cn/concept.md
+++ b/docs/cn/concept.md
@@ -32,7 +32,7 @@ RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ
支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
## 11 集群消费(Clustering)
-集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
+集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
## 12 广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
diff --git a/docs/cn/controller/design.md b/docs/cn/controller/design.md
index fb0750bf3..a8d18dd67 100644
--- a/docs/cn/controller/design.md
+++ b/docs/cn/controller/design.md
@@ -5,7 +5,7 @@
- Raft 模式下,Broker组内副本数必须是三副本及以上,副本的ACK也必须遵循多数派协议。
- RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。
-因此我们希望利用 DLedger 实现一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件, 支持独立部署,
也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举, 从而解决上述问题, 我们将该新模式称为
Controller 模式。
+因此我们希望利用 DLedger 实现一个基于 Raft 的一致性模块(DLedger
Controller),并当作一个可选的选主组件,支持独立部署,也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成
Master 的选举,从而解决上述问题,我们将该新模式称为 Controller 模式。
# 架构
@@ -13,44 +13,44 @@

-如图是 Controller 模式的核心架构, 介绍如下:
+如图是 Controller 模式的核心架构,介绍如下:
-- DledgerController: 利⽤ DLedger ,构建⼀个保证元数据强⼀致性的 DLedger Controller 控制器,利⽤ Raft
选举会选出⼀个 Active DLedger Controller 作为主控制器,DLedger Controller 可以内嵌在
Nameserver中,也可以独立的部署。其主要作用是, 用来存储和管理 Broker 的 SyncStateSet 列表, 并在某个 Broker 的
Master Broker 下线或⽹络隔离时,主动发出调度指令来切换 Broker 的 Master。
-- SyncStateSet: 主要表示⼀个 broker 副本组中跟上 Master 的 Slave 副本加上 Master 的集合。主要判断标准是
Master 和 Slave 之间的差距。当 Master 下线时,我们会从 SyncStateSet 列表中选出新的 Master。
SyncStateSet 列表的变更主要由 Master Broker 发起。Master通过定时任务判断和同步过程中完成 SyncStateSet
的Shrink 和 Expand,并向选举组件 Controller 发起 Alter SyncStateSet 请求。
-- AutoSwitchHAService: 一个新的 HAService, 在 DefaultHAService 的基础上, 支持 BrokerRole
的切换, 支持 Master 和 Slave 之间互相转换 (在 Controller 的控制下) 。此外, 该 HAService 统一了日志复制流程,
会在 HA HandShake 阶段进行日志的截断。
-- ReplicasManager: 作为一个中间组件, 起到承上启下的作用。对上, 可以定期同步来自 Controller 的控制指令, 对下,
可以定期监控 HAService 的状态, 并在合适的时间修改 SyncStateSet。ReplicasManager 会定期同步 Controller
中关于该 Broker 的元数据, 当 Controller 选举出一个新的 Master 的时候, ReplicasManager 能够感知到元数据的变化,
并进行 BrokerRole 的切换。
+- DledgerController:利⽤ DLedger ,构建⼀个保证元数据强⼀致性的 DLedger Controller 控制器,利⽤ Raft
选举会选出⼀个 Active DLedger Controller 作为主控制器,DLedger Controller 可以内嵌在
Nameserver中,也可以独立的部署。其主要作用是,用来存储和管理 Broker 的 SyncStateSet 列表,并在某个 Broker 的
Master Broker 下线或⽹络隔离时,主动发出调度指令来切换 Broker 的 Master。
+- SyncStateSet:主要表示⼀个 broker 副本组中跟上 Master 的 Slave 副本加上 Master 的集合。主要判断标准是
Master 和 Slave 之间的差距。当 Master 下线时,我们会从 SyncStateSet 列表中选出新的 Master。
SyncStateSet 列表的变更主要由 Master Broker 发起。Master通过定时任务判断和同步过程中完成 SyncStateSet
的Shrink 和 Expand,并向选举组件 Controller 发起 Alter SyncStateSet 请求。
+- AutoSwitchHAService:一个新的 HAService,在 DefaultHAService 的基础上,支持 BrokerRole
的切换,支持 Master 和 Slave 之间互相转换 (在 Controller 的控制下) 。此外,该 HAService 统一了日志复制流程,会在
HA HandShake 阶段进行日志的截断。
+- ReplicasManager:作为一个中间组件,起到承上启下的作用。对上,可以定期同步来自 Controller 的控制指令,对下,可以定期监控
HAService 的状态,并在合适的时间修改 SyncStateSet。ReplicasManager 会定期同步 Controller 中关于该
Broker 的元数据,当 Controller 选举出一个新的 Master 的时候,ReplicasManager 能够感知到元数据的变化,并进行
BrokerRole 的切换。
## DLedgerController 核心设计

-如图是 DledgerController 的核心设计:
+如图是 DledgerController 的核心设计:
-- DLedgerController 可以内嵌在 Namesrv 中, 也可以独立的部署。
-- Active DLedgerController 是 DLedger 选举出来的 Leader, 其会接受来自客户端的事件请求, 并通过 DLedger
发起共识, 最后应用到内存元数据状态机中。
-- Not Active DLedgerController, 也即 Follower 角色, 其会通过 DLedger 复制来自 Active
DLedgerController 的事件日志, 然后直接运用到状态机中。
+- DLedgerController 可以内嵌在 Namesrv 中,也可以独立的部署。
+- Active DLedgerController 是 DLedger 选举出来的 Leader,其会接受来自客户端的事件请求,并通过 DLedger
发起共识,最后应用到内存元数据状态机中。
+- Not Active DLedgerController,也即 Follower 角色,其会通过 DLedger 复制来自 Active
DLedgerController 的事件日志,然后直接运用到状态机中。
## 日志复制
### 基本概念与流程
-为了统一日志复制流程, 区分每一任 Master 的日志复制边界, 方便日志截断, 引入了 MasterEpoch 的概念, 代表当前 Master
的任期号 (类似 Raft Term 的含义)。
+为了统一日志复制流程,区分每一任 Master 的日志复制边界,方便日志截断,引入了 MasterEpoch 的概念,代表当前 Master 的任期号
(类似 Raft Term 的含义)。
-对于每一任 Master, 其都有 MasterEpoch 与 StartOffset, 分别代表该 Master 的任期号与起始日志位移。
+对于每一任 Master,其都有 MasterEpoch 与 StartOffset,分别代表该 Master 的任期号与起始日志位移。
-需要注意的是, MasterEpoch 是由 Controller 决定的, 且其是单调递增的。
+需要注意的是,MasterEpoch 是由 Controller 决定的,且其是单调递增的。
-此外, 我们还引入了 EpochFile, 用于存放 <Epoch, StartOffset> 序列。
+此外,我们还引入了 EpochFile,用于存放 <Epoch, StartOffset> 序列。
-**当⼀个 Broker 成为 Master,其会:**
+**当⼀个 Broker 成为 Master,其会:**
- 将 Commitlog 截断到最后⼀条消息的边界。
-- 同时最新将 <MasterEpoch , startoffset> 持久化到 EpochFile, startOffset 也即当前 CommitLog
的 MaxPhyOffset 。
+- 同时最新将 <MasterEpoch , startoffset> 持久化到 EpochFile,startOffset 也即当前 CommitLog
的 MaxPhyOffset 。
- 然后 HAService 监听连接,创建 HAConnection,配合 Slave 完成流程交互。
-**当一个 Broker 成为 Slave, 其会:**
+**当一个 Broker 成为 Slave,其会:**
Ready 阶段:
@@ -60,19 +60,19 @@ Ready 阶段:
Handshake 阶段:
-- 进⾏⽇志截断,这⾥关键在于 Slave 利⽤本地的 epoch 与 startOffset 和 Master 对⽐, 找到⽇志截断点,进⾏⽇志截断。
+- 进⾏⽇志截断,这⾥关键在于 Slave 利⽤本地的 epoch 与 startOffset 和 Master 对⽐,找到⽇志截断点,进⾏⽇志截断。
-Transfer 阶段:
+Transfer 阶段:
- 从 Master 同步日志。
### 截断算法
-具体的日志截断算法流程如下:
+具体的日志截断算法流程如下:
-- 在 HandShake 阶段, Slave 会从 Master 处获取 Master 的 EpochCache 。
+- 在 HandShake 阶段, Slave 会从 Master 处获取 Master 的 EpochCache 。
-- Slave ⽐较获取到的 Master EpochCahce <Startoffset,Endoffset>,从后往前依次和本地进行比对, 如果二者的
Epoch 与 StartOffset 相等, 则该 Epoch 有效,截断位点为两者中较⼩的 Endoffset,截断后修正⾃⼰的<Epoch ,
Startoffset> 信息,进⼊Transfer 阶 段;如果不相等,对比 Slave 前⼀个epoch,直到找到截断位点。
+- Slave ⽐较获取到的 Master EpochCahce <Startoffset,Endoffset>,从后往前依次和本地进行比对,如果二者的
Epoch 与 StartOffset 相等, 则该 Epoch 有效,截断位点为两者中较⼩的 Endoffset,截断后修正⾃⼰的<Epoch ,
Startoffset> 信息,进⼊Transfer 阶 段;如果不相等,对比 Slave 前⼀个epoch,直到找到截断位点。
```java
slave:TreeMap<Epoch, Pair<startOffset,endOffset>> epochMap;
@@ -95,15 +95,15 @@ while (iterator.hasNext()) {
### 复制流程
-由于 Ha 是基于流进行日志复制的, 我们无法分清日志的边界 (也即传输的一批日志可能横跨多个 MasterEpoch), Slave 无法感知到
MasterEpoch 的变化, 也就无法及时修改 EpochFile。
+由于 Ha 是基于流进行日志复制的,我们无法分清日志的边界 (也即传输的一批日志可能横跨多个 MasterEpoch),Slave 无法感知到
MasterEpoch 的变化,也就无法及时修改 EpochFile。
-因此, 我们做了如下改进:
+因此,我们做了如下改进:
Master 传输⽇志时,保证⼀次发送的⼀个 batch 是同⼀个 epoch 中的,⽽不能横跨多个 epoch。可以在WriteSocketService
中新增两个变量:
- currentTransferEpoch:代表当前 WriteSocketService.nextTransferFromWhere 对应在哪个
epoch 中
-- currentTransferEpochEndOffset: 对应 currentTransferEpoch 的 end offset.。如果
currentTransferEpoch == MaxEpoch,则 currentTransferEpochEndOffset= -1,表示没有界限。
+- currentTransferEpochEndOffset: 对应 currentTransferEpoch 的 end offset.。如果
currentTransferEpoch == MaxEpoch,则 currentTransferEpochEndOffset= -1,表示没有界限。
WriteSocketService 传输下⼀批⽇志时 (假设这⼀批⽇志总⼤⼩为 size),如果发现
@@ -113,60 +113,60 @@ nextTransferFromWhere + size >
currentTransferEpochEndOffset,则将 selectMapp
### 复制协议
-根据上文我们可以知道, AutoSwitchHaService 对日志复制划分为多个阶段, 下面介绍是该 HaService 的协议。
+根据上文我们可以知道,AutoSwitchHaService 对日志复制划分为多个阶段,下面介绍是该 HaService 的协议。
#### Handshake 阶段
-1.AutoSwitchHaClient (Slave) 会向 Master 发送 HandShake 包, 如下:
+1.AutoSwitchHaClient (Slave) 会向 Master 发送 HandShake 包,如下:

`current state(4byte) + Two flags(4byte) + slaveAddressLength(4byte) +
slaveAddress(50byte)`
-- Current state 代表当前的 HAConnectionState, 也即 HANDSHAKE。
+- Current state 代表当前的 HAConnectionState,也即 HANDSHAKE。
-- Two falgs 是两个状态标志位, 其中, isSyncFromLastFile 代表是否要从 Master 的最后一个文件开始复制,
isAsyncLearner 代表该 Slave 是否是异步复制, 并以 Learner 的形式接入 Master。
+- Two falgs 是两个状态标志位,其中,isSyncFromLastFile 代表是否要从 Master
的最后一个文件开始复制,isAsyncLearner 代表该 Slave 是否是异步复制,并以 Learner 的形式接入 Master。
-- slaveAddressLength 与 slaveAddress 代表了该 Slave 的地址, 用于后续加入 SyncStateSet 。
+- slaveAddressLength 与 slaveAddress 代表了该 Slave 的地址,用于后续加入 SyncStateSet 。
-2.AutoSwitchHaConnection (Master) 会向 Slave 回送 HandShake 包, 如下:
+2.AutoSwitchHaConnection (Master) 会向 Slave 回送 HandShake 包,如下:

`current state(4byte) + body size(4byte) + offset(8byte) + epoch(4byte) + body`
-- Current state 代表当前的 HAConnectionState, 也即 HANDSHAKE。
+- Current state 代表当前的 HAConnectionState,也即 HANDSHAKE。
- Body size 代表了 body 的长度。
- Offset 代表 Master 端日志的最大偏移量。
- Epoch 代表了 Master 的 Epoch 。
- Body 中传输的是 Master 端的 EpochEntryList 。
-Slave 收到 Master 回送的包后, 就会在本地进行上文阐述的日志截断流程。
+Slave 收到 Master 回送的包后,就会在本地进行上文阐述的日志截断流程。
#### Transfer 阶段
-1.AutoSwitchHaConnection (Master) 会不断的往 Slave 发送日志包, 如下:
+1.AutoSwitchHaConnection (Master) 会不断的往 Slave 发送日志包,如下:

`current state(4byte) + body size(4byte) + offset(8byte) + epoch(4byte) +
epochStartOffset(8byte) + additionalInfo(confirmOffset) (8byte)+ body`
-- Current state 代表当前的 HAConnectionState, 也即 Transfer 。
-- Body size 代表了 body 的长度。
-- Offset 当前这一批次的日志的起始偏移量。
-- Epoch: 代表当前这一批次日志所属的 MasterEpoch。
-- epochStartOffset: 代表当前这一批次日志的 MasterEpoch 对应的 StartOffset。
-- confirmOffset: 代表在 SyncStateSet 中的副本的最小偏移量。
-- Body: 日志。
+- Current state:代表当前的 HAConnectionState,也即 Transfer 。
+- Body size:代表了 body 的长度。
+- Offset:当前这一批次的日志的起始偏移量。
+- Epoch:代表当前这一批次日志所属的 MasterEpoch。
+- epochStartOffset:代表当前这一批次日志的 MasterEpoch 对应的 StartOffset。
+- confirmOffset:代表在 SyncStateSet 中的副本的最小偏移量。
+- Body:日志。
-2.AutoSwitchHaClient (Slave) 会向 Master 发送 ACK 包:
+2.AutoSwitchHaClient (Slave) 会向 Master 发送 ACK 包:

` current state(4byte) + maxOffset(8byte)`
-- Current state 代表当前的 HAConnectionState, 也即 Transfer 。
-- MaxOffset: 代表当前 Slave 的最大日志偏移量。
+- Current state:代表当前的 HAConnectionState,也即 Transfer 。
+- MaxOffset:代表当前 Slave 的最大日志偏移量。
## Master 选举
@@ -174,17 +174,17 @@ Slave 收到 Master 回送的包后, 就会在本地进行上文阐述的日志
ELectMaster 主要是在某 Broker 副本组的 Master 下线或不可访问时,重新从 SyncStateSet 列表⾥⾯选出⼀个新的
Master,该事件由 Controller ⾃身或者通过运维命令`electMaster` 发起Master选举。
-无论 Controller 是独立部署, 还是嵌入在 Namesrv 中, 其都会监听每个 Broker 的连接通道, 如果某个 Broker
channel inActive 了, 就会判断该 Broker 是否为 Master, 如果是, 则会触发选主的流程。
+无论 Controller 是独立部署,还是嵌入在 Namesrv 中,其都会监听每个 Broker 的连接通道,如果某个 Broker channel
inActive 了,就会判断该 Broker 是否为 Master,如果是,则会触发选主的流程。
选举 Master 的⽅式⽐较简单,我们只需要在该组 Broker 所对应的 SyncStateSet 列表中,挑选⼀个出来成为新的 Master
即可,并通过 DLedger 共识后应⽤到内存元数据,最后将结果通知对应的Broker副本组。
### SyncStateSet 变更
-SyncStateSet 是选主的重要依据, SyncStateSet 列表的变更主要由 Master Broker
发起。Master通过定时任务判断和同步过程中完成 SyncStateSet 的Shrink 和 Expand,并向选举组件 Controller 发起
Alter SyncStateSet 请求。
+SyncStateSet 是选主的重要依据,SyncStateSet 列表的变更主要由 Master Broker
发起。Master通过定时任务判断和同步过程中完成 SyncStateSet 的Shrink 和 Expand,并向选举组件 Controller 发起
Alter SyncStateSet 请求。
#### Shrink
-Shrink SyncStateSet ,指把 SyncStateSet 副本集合中那些与Master差距过⼤的副本移除,判断依据如下:
+Shrink SyncStateSet ,指把 SyncStateSet 副本集合中那些与Master差距过⼤的副本移除,判断依据如下:
- 增加 haMaxTimeSlaveNotCatchUp 参数 。
@@ -192,9 +192,9 @@ Shrink SyncStateSet ,指把 SyncStateSet 副本集合中那些与Master差距
- ReadSocketService 接收到 slaveAckOffset 时若 slaveAckOffset >=
lastMasterMaxOffset 则将lastCaughtUpTimeMs 更新为 lastTransferTimeMs。
-- Master 端通过定时任务扫描每一个 HaConnection, 如果 (cur_time -
connection.lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchUp,则该 Slave 是
Out-of-sync 的。
+- Master 端通过定时任务扫描每一个 HaConnection,如果 (cur_time -
connection.lastCaughtUpTimeMs) > haMaxTimeSlaveNotCatchUp,则该 Slave 是
Out-of-sync 的。
-- 如果检测到 Slave out of sync , master 会立刻向 Controller 上报SyncStateSet, 从而 Shrink
SyncStateSet。
+- 如果检测到 Slave out of sync ,master 会立刻向 Controller 上报SyncStateSet,从而 Shrink
SyncStateSet。
#### Expand
diff --git a/docs/cn/controller/quick_start.md
b/docs/cn/controller/quick_start.md
index 0bc20eb48..0b5e10eb5 100644
--- a/docs/cn/controller/quick_start.md
+++ b/docs/cn/controller/quick_start.md
@@ -4,7 +4,7 @@

-该文档主要介绍如何快速构建自动主从切换的 RocketMQ 集群,其架构如上图所示,
主要增加支持自动主从切换的Controller组件,其可以独立部署也可以内嵌在NameServer中。
+该文档主要介绍如何快速构建自动主从切换的 RocketMQ
集群,其架构如上图所示,主要增加支持自动主从切换的Controller组件,其可以独立部署也可以内嵌在NameServer中。
详细设计思路请参考 [设计思想](design.md).
@@ -39,7 +39,7 @@ $ sh bin/mqadmin getControllerMetaData -a localhost:9878
-a代表集群中任意一个Controller的地址
-至此, 启动成功,现在可以向集群收发消息,并进行切换测试了。
+至此,启动成功,现在可以向集群收发消息,并进行切换测试了。
如果需要关闭快速集群,可以执行:
@@ -47,11 +47,11 @@ $ sh bin/mqadmin getControllerMetaData -a localhost:9878
$ sh bin/controller/fast-try.sh stop
```
-对于快速部署,默认配置在 conf/controller/quick-start里面,默认的存储路径在 /tmp/rmqstore, 且会开启一个
Controller (嵌入在 Namesrv) 和两个 Broker。
+对于快速部署,默认配置在 conf/controller/quick-start里面,默认的存储路径在 /tmp/rmqstore,且会开启一个
Controller (嵌入在 Namesrv) 和两个 Broker。
### 查看 SyncStateSet
-可以通过运维工具查看 SyncStateSet:
+可以通过运维工具查看 SyncStateSet:
```shell
$ sh bin/mqadmin getSyncStateSet -a localhost:9878 -b broker-a
@@ -59,13 +59,13 @@ $ sh bin/mqadmin getSyncStateSet -a localhost:9878 -b
broker-a
-a 代表的是任意一个 Controller 的地址
-如果顺利的话, 可以看到以下内容:
+如果顺利的话,可以看到以下内容:

### 查看 BrokerEpoch
-可以通过运维工具查看 BrokerEpochEntry:
+可以通过运维工具查看 BrokerEpochEntry:
```shell
$ sh bin/mqadmin getBrokerEpoch -n localhost:9876 -b broker-a
@@ -73,7 +73,7 @@ $ sh bin/mqadmin getBrokerEpoch -n localhost:9876 -b broker-a
-n 代表的是任意一个 Namesrv 的地址
-如果顺利的话, 可以看到以下内容:
+如果顺利的话,可以看到以下内容:

@@ -81,7 +81,7 @@ $ sh bin/mqadmin getBrokerEpoch -n localhost:9876 -b broker-a
部署成功后,现在尝试进行 Master 切换。
-首先, kill 掉原 Master 的进程, 在上文的例子中, 就是使用端口 30911 的进程:
+首先,kill 掉原 Master 的进程,在上文的例子中,就是使用端口 30911 的进程:
```shell
#查找端口:
@@ -90,7 +90,7 @@ $ ps -ef|grep java|grep BrokerStartup|grep
./conf/controller/quick-start/broker-
$ kill -9 PID
```
-接着, 用 SyncStateSet admin 脚本查看:
+接着,用 SyncStateSet admin 脚本查看:
```shell
$ sh bin/mqadmin getSyncStateSet -a localhost:9878 -b broker-a
diff --git a/docs/cn/design.md b/docs/cn/design.md
index 94d06a2ae..00b4de379 100644
--- a/docs/cn/design.md
+++ b/docs/cn/design.md
@@ -10,7 +10,7 @@
#### 1.1 消息存储整体架构
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
-(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G,
文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
+(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G,
文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
(2)
ConsumeQueue:消息消费索引,引入的目的主要是提高消息消费的性能。由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件,根据topic检索消息是非常低效的。Consumer可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag
hashcode,单个文件由30W个条目组成,可以像数组 [...]
@@ -43,7 +43,7 @@ RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Produc
(4) 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。
-从上面1)~3)中可以看出在消息生产者,
Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。
+从上面1)~3)中可以看出在消息生产者,Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。
rocketmq-remoting 模块是
RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。
#### 2.1 Remoting通信类结构
@@ -86,7 +86,7 @@ RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了R

-从上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor
主线程(eventLoopGroupBoss,即为上面的1)负责监听
TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据
RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的
processor,然后封装成task任务后,提交给对应的业务processor处理线程池
来执行(sendM [...]
+从上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor
主线程(eventLoopGroupBoss,即为上面的1)负责监听
TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据
RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的
processor,然后封装成task任务后,提交给对应的业务processor处理线程�
��来执行(sendM [...]
线程数 | 线程名 | 线程具体说明
--- | --- | ---
diff --git a/docs/cn/dledger/deploy_guide.md b/docs/cn/dledger/deploy_guide.md
index ab69b58ce..2d04590e0 100644
--- a/docs/cn/dledger/deploy_guide.md
+++ b/docs/cn/dledger/deploy_guide.md
@@ -20,7 +20,7 @@ RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多
| enableDLegerCommitLog | 是否启动 DLedger | true |
| dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
| dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 |
n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
-| dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
+| dLegerSelfId | 节点 id,必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
| sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
这里贴出 conf/dledger/broker-n0.conf 的配置举例。
diff --git a/docs/cn/features.md b/docs/cn/features.md
index e7b63864e..ab67683a2 100644
--- a/docs/cn/features.md
+++ b/docs/cn/features.md
@@ -56,9 +56,9 @@ RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGro
## 10 消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:
--
retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed +
1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
--
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
--
retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
+-
retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed +
1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
+-
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
+-
retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
## 11 流量控制
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md
index 3e304d8f2..d8314052b 100644
--- a/docs/cn/msg_trace/user_guide.md
+++ b/docs/cn/msg_trace/user_guide.md
@@ -5,7 +5,7 @@
| Producer端| Consumer端 | Broker端 |
| --- | --- | --- |
| 生产实例信息 | 消费实例信息 | 消息的Topic |
-| 发送消息时间 | 投递时间,投递轮次 | 消息存储位置 |
+| 发送消息时间 | 投递时间,投递轮次 | 消息存储位置 |
| 消息是否发送成功 | 消息是否消费成功 | 消息的Key值 |
| 发送耗时 | 消费耗时 | 消息的Tag值 |
diff --git a/docs/cn/operation.md b/docs/cn/operation.md
index f36936c35..4310da570 100644
--- a/docs/cn/operation.md
+++ b/docs/cn/operation.md
@@ -6,7 +6,7 @@
#### 1.1 单Master模式
-这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
+这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
##### 1)启动 NameServer
@@ -1396,13 +1396,13 @@ RocketMQ 5.0 开始支持自动主从切换的模式,可参考以下文档
解决方案:rocketmq默认策略是从消息队列尾部,即跳过历史消息。如果想消费历史消息,则需要设置:`org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere`。常用的有以下三种配置:
-- 默认配置,一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费,即跳过历史消息;
+- 默认配置,一个新的订阅组第一次启动从队列的最后位置开始消费,后续再启动接着上次消费的进度开始消费,即跳过历史消息;
```java
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
```
-- 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费,即消费Broker未过期的历史消息;
+- 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费,即消费Broker未过期的历史消息;
```java
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
diff --git
"a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
"b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
index ac1cc6b5f..67cb1d587 100644
---
"a/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
+++
"b/docs/cn/statictopic/RocketMQ_Static_Topic_Logic_Queue_\350\256\276\350\256\241.md"
@@ -55,7 +55,7 @@ LogicQueue的思路就是为了解决这一问题。
- Second Leader Queue,某个『Logic Queue』次新映射的『Physical Queue』,也即最新一次切换之前的『Leader
Queue』
#### Scope 目标
-单集群固定 和 全网固定, 参考 [The_Scope_Of_Static_Topic](The_Scope_Of_Static_Topic.md)。
+单集群固定 和 全网固定,参考 [The_Scope_Of_Static_Topic](The_Scope_Of_Static_Topic.md)。
#### LogicQueue 目标
@@ -160,7 +160,7 @@ LogicQueue 的 Source of Truth 就是 LogicQueue 到 Physical Queue
的映射关
}
}
```
-上述示例的含义是:
+上述示例的含义是:
* broker02 拥有 LogicQueue 3
* LogicQueue 3 由 2 个 Physical Queue 组成
* 位点范围『0-1000』映射到 Physical Queue 『broker01-0』上面
@@ -261,7 +261,7 @@ SOT存储在Broker上,所以使用从 Broker开始。
遗留问题:
是否需要尊重 readQueueNums 和 writeQueueNums ?
-在LogicQueue这里,这个场景是没有意义的, 但依然保持尊重。
+在LogicQueue这里,这个场景是没有意义的,但依然保持尊重。
#### Client 解析数据
改动两个方法即可:
@@ -436,7 +436,7 @@ User 接口,使用范围广泛如多语言等,应该尽可能简单,把适
如果做在服务端,则可能产生交叉访问,在极端情况下,性能会非常差,举个例子:
100 个 Broker,相互交叉映射过一遍,则Admin客户端首先要向 100 个 Broker 发请求,然后这 100 个 Broker
为了获取远程信息,各自向其余 Broker 发请求。
其实际网络请求数就是 100 * 100 = 10000 个网络请求。放大效应十分明显。
-同时, 考虑到 Admin 接口,使用范围是有限的,无需考虑多语言适配等问题,可以把适配逻辑做在 Admin 客户端。
+同时,考虑到 Admin 接口,使用范围是有限的,无需考虑多语言适配等问题,可以把适配逻辑做在 Admin 客户端。
#### 远程读的性能问题
从实战经验来看,性能损耗几乎不计。
diff --git a/docs/cn/statictopic/The_Scope_Of_Static_Topic.md
b/docs/cn/statictopic/The_Scope_Of_Static_Topic.md
index 9c9973c9d..886e33e2f 100644
--- a/docs/cn/statictopic/The_Scope_Of_Static_Topic.md
+++ b/docs/cn/statictopic/The_Scope_Of_Static_Topic.md
@@ -13,7 +13,7 @@ RocketMQ的集群设计,是一个多集群、动态、零耦合的设计,具
- 开发用户对 Cluster 无感知
- 不同 Broker 之间没有任何关联
-这样的设计,在运维时带来了极大的便利,但也带来了一个问题:
+这样的设计,在运维时带来了极大的便利,但也带来了一个问题:
- Topic 的队列数无法固定
基于 Logic Queue 技术而实现的 Static Topic,就是用来解决『固定队列数量』的问题。
@@ -52,19 +52,19 @@ __logic__global
主要原因是,不想完全放弃 RocketMQ 『多集群、动态、零耦合』的设计优势。
而全网固定,则意味着彻底失去了这个优势。
-举1个『多活保序』的场景:
+举1个『多活保序』的场景:
- ClusterA 部署在 SiteA 内,创建 Static Topic 『TopicTest』,有50个队列。
- ClusterB 部署在 SiteB 内,创建 Static Topic 『TopicTest』,有50个队列。
对Nameserver稍作修改,支持传入标识符(比如为scope或者unitName),来获取特定范围内的 Topic Route。
正常情况下:
-- SiteA 的Producer和Consumer 只能看见 ClusterA 的 MessageQueue, brokerName为
"__logic__clusterA"。
-- SiteB 的Producer和Consumer 只能看见 ClusterB 的 MessageQueue, brokerName为
"__logic__clusterB"。
+- SiteA 的Producer和Consumer 只能看见 ClusterA 的 MessageQueue,brokerName为
"__logic__clusterA"。
+- SiteB 的Producer和Consumer 只能看见 ClusterB 的 MessageQueue,brokerName为
"__logic__clusterB"。
- 机房内就近访问,且机房内严格保序。
假设 SiteA 宕机,此时对Nameserver发指令允许全网读,也即忽略客户端传入的 Scope或者unitName 标识符:
-- SiteB 的 Producer 仍然看见并写入 ClusterB 的 MessageQueue, brokerName为
"__logic__clusterB"
+- SiteB 的 Producer 仍然看见并写入 ClusterB 的 MessageQueue,brokerName为
"__logic__clusterB"
- SiteB 的 Consumer 可以同时看见并读取 ClusterA 的 MessageQueue 和 ClusterB MessageQueue,
brokerName为 "__logic__clusterB" 和 "__logic__clusterA
- 在这种场景下,Consumer 在消费 ClusterB 数据的同时,同时去消费 ClusterA 未消费完的数据