This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c4227b235 [ISSUE #5348] [RIP-48] Support server-side offset management
in broadcast consumption mode (#5349)
c4227b235 is described below
commit c4227b23575747edeac690f9e964dadfbd428fc2
Author: lizhimins <[email protected]>
AuthorDate: Fri Oct 21 16:24:12 2022 +0800
[ISSUE #5348] [RIP-48] Support server-side offset management in broadcast
consumption mode (#5349)
* Support server-side offset management in broadcast consumption mode
* Fix unit test npe and and offset store test
* Fix fast encode decode test
Co-authored-by: 斜阳 <[email protected]>
---
.../apache/rocketmq/broker/BrokerController.java | 15 ++
.../broker/offset/BroadcastOffsetManager.java | 242 +++++++++++++++++++++
.../broker/offset/BroadcastOffsetStore.java | 55 +++++
.../processor/DefaultPullMessageResultHandler.java | 4 +
.../broker/processor/PullMessageProcessor.java | 85 +++++++-
.../broker/offset/BroadcastOffsetManagerTest.java | 163 ++++++++++++++
.../broker/offset/BroadcastOffsetStoreTest.java | 31 +++
.../broker/processor/PullMessageProcessorTest.java | 22 ++
.../org/apache/rocketmq/common/BrokerConfig.java | 30 +++
.../rocketmq/common/protocol/RequestSource.java | 40 ++++
.../protocol/header/PullMessageRequestHeader.java | 41 ++++
11 files changed, 726 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 717a08021..657234e26 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -66,6 +66,7 @@ import
org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
+import org.apache.rocketmq.broker.offset.BroadcastOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
@@ -168,6 +169,7 @@ public class BrokerController {
private final NettyClientConfig nettyClientConfig;
protected final MessageStoreConfig messageStoreConfig;
protected final ConsumerOffsetManager consumerOffsetManager;
+ protected final BroadcastOffsetManager broadcastOffsetManager;
protected final ConsumerManager consumerManager;
protected final ConsumerFilterManager consumerFilterManager;
protected final ConsumerOrderInfoManager consumerOrderInfoManager;
@@ -296,6 +298,7 @@ public class BrokerController {
this.setStoreHost(new
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat()) : new
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat());
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new
LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
+ this.broadcastOffsetManager = new BroadcastOffsetManager(this);
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new
LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
@@ -1170,6 +1173,10 @@ public class BrokerController {
return consumerOffsetManager;
}
+ public BroadcastOffsetManager getBroadcastOffsetManager() {
+ return broadcastOffsetManager;
+ }
+
public MessageStoreConfig getMessageStoreConfig() {
return messageStoreConfig;
}
@@ -1277,6 +1284,10 @@ public class BrokerController {
this.fileWatchService.shutdown();
}
+ if (this.broadcastOffsetManager != null) {
+ this.broadcastOffsetManager.shutdown();
+ }
+
if (this.messageStore != null) {
this.messageStore.shutdown();
}
@@ -1503,6 +1514,10 @@ public class BrokerController {
this.brokerFastFailure.start();
}
+ if (this.broadcastOffsetManager != null) {
+ this.broadcastOffsetManager.start();
+ }
+
if (this.escapeBridge != null) {
this.escapeBridge.start();
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
new file mode 100644
index 000000000..16e70eed2
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManager.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.ServiceThread;
+
+/**
+ * manage the offset of broadcast.
+ * now, use this to support switch remoting client between proxy and broker
+ */
+public class BroadcastOffsetManager extends ServiceThread {
+ private static final String TOPIC_GROUP_SEPARATOR = "@";
+ private final BrokerController brokerController;
+ private final BrokerConfig brokerConfig;
+
+ /**
+ * k: topic@groupId
+ * v: the pull offset of all client of all queue
+ */
+ protected final ConcurrentHashMap<String /* topic@groupId */,
BroadcastOffsetData> offsetStoreMap =
+ new ConcurrentHashMap<>();
+
+ public BroadcastOffsetManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ this.brokerConfig = brokerController.getBrokerConfig();
+ }
+
+ public void updateOffset(String topic, String group, int queueId, long
offset, String clientId, boolean fromProxy) {
+ BroadcastOffsetData broadcastOffsetData =
offsetStoreMap.computeIfAbsent(
+ buildKey(topic, group), key -> new BroadcastOffsetData(topic,
group));
+
+ broadcastOffsetData.clientOffsetStore.compute(clientId, (clientIdKey,
broadcastTimedOffsetStore) -> {
+ if (broadcastTimedOffsetStore == null) {
+ broadcastTimedOffsetStore = new
BroadcastTimedOffsetStore(fromProxy);
+ }
+
+ broadcastTimedOffsetStore.timestamp = System.currentTimeMillis();
+ broadcastTimedOffsetStore.fromProxy = fromProxy;
+ broadcastTimedOffsetStore.offsetStore.updateOffset(queueId,
offset, true);
+ return broadcastTimedOffsetStore;
+ });
+ }
+
+ /**
+ * the time need init offset
+ * 1. client connect to proxy -> client connect to broker
+ * 2. client connect to broker -> client connect to proxy
+ * 3. client connect to proxy at the first time
+ *
+ * @return -1 means no init offset, use the queueOffset in
pullRequestHeader
+ */
+ public Long queryInitOffset(String topic, String groupId, int queueId,
String clientId, long requestOffset,
+ boolean fromProxy) {
+
+ BroadcastOffsetData broadcastOffsetData =
offsetStoreMap.get(buildKey(topic, groupId));
+ if (broadcastOffsetData == null) {
+ if (fromProxy && requestOffset < 0) {
+ return getOffset(null, topic, groupId, queueId);
+ } else {
+ return -1L;
+ }
+ }
+
+ final AtomicLong offset = new AtomicLong(-1L);
+ broadcastOffsetData.clientOffsetStore.compute(clientId, (clientIdK,
offsetStore) -> {
+ if (offsetStore == null) {
+ offsetStore = new BroadcastTimedOffsetStore(fromProxy);
+ }
+
+ if (offsetStore.fromProxy && requestOffset < 0) {
+ // when from proxy and requestOffset is -1
+ // means proxy need a init offset to pull message
+ offset.set(getOffset(offsetStore, topic, groupId, queueId));
+ return offsetStore;
+ }
+
+ if (offsetStore.fromProxy == fromProxy) {
+ return offsetStore;
+ }
+
+ offset.set(getOffset(offsetStore, topic, groupId, queueId));
+ return offsetStore;
+ });
+ return offset.get();
+ }
+
+ private long getOffset(BroadcastTimedOffsetStore offsetStore, String
topic, String groupId, int queueId) {
+ long storeOffset = -1;
+ if (offsetStore != null) {
+ storeOffset = offsetStore.offsetStore.readOffset(queueId);
+ }
+ if (storeOffset < 0) {
+ storeOffset =
+
brokerController.getConsumerOffsetManager().queryOffset(broadcastGroupId(groupId),
topic, queueId);
+ }
+ if (storeOffset < 0) {
+ if
(!this.brokerController.getMessageStore().checkInDiskByConsumeOffset(topic,
queueId, 0)) {
+ storeOffset = 0;
+ } else {
+ storeOffset =
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId, true);
+ }
+ }
+ return storeOffset;
+ }
+
+ /**
+ * 1. scan expire offset
+ * 2. calculate the min offset of all client of one topic@group,
+ * and then commit consumer offset by group@broadcast
+ */
+ protected void scanOffsetData() {
+ for (String k : offsetStoreMap.keySet()) {
+ BroadcastOffsetData broadcastOffsetData = offsetStoreMap.get(k);
+ if (broadcastOffsetData == null) {
+ continue;
+ }
+
+ Map<Integer, Long> queueMinOffset = new HashMap<>();
+
+ for (String clientId :
broadcastOffsetData.clientOffsetStore.keySet()) {
+ broadcastOffsetData.clientOffsetStore
+ .computeIfPresent(clientId, (clientIdKey,
broadcastTimedOffsetStore) -> {
+ long interval = System.currentTimeMillis() -
broadcastTimedOffsetStore.timestamp;
+ boolean clientIsOnline =
brokerController.getConsumerManager().findChannel(broadcastOffsetData.group,
clientId) != null;
+ if (clientIsOnline || interval <
Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()).toMillis()) {
+ Set<Integer> queueSet =
broadcastTimedOffsetStore.offsetStore.queueList();
+ for (Integer queue : queueSet) {
+ long offset =
broadcastTimedOffsetStore.offsetStore.readOffset(queue);
+ offset =
Math.min(queueMinOffset.getOrDefault(queue, offset), offset);
+ queueMinOffset.put(queue, offset);
+ }
+ }
+ if (clientIsOnline && interval >=
Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireMaxSecond()).toMillis())
{
+ return null;
+ }
+ if (!clientIsOnline && interval >=
Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()).toMillis()) {
+ return null;
+ }
+ return broadcastTimedOffsetStore;
+ });
+ }
+
+ offsetStoreMap.computeIfPresent(k, (key, broadcastOffsetDataVal)
-> {
+ if (broadcastOffsetDataVal.clientOffsetStore.isEmpty()) {
+ return null;
+ }
+ return broadcastOffsetDataVal;
+ });
+
+ queueMinOffset.forEach((queueId, offset) ->
+
this.brokerController.getConsumerOffsetManager().commitOffset("BroadcastOffset",
+ broadcastGroupId(broadcastOffsetData.group),
broadcastOffsetData.topic, queueId, offset));
+ }
+ }
+
+ private String buildKey(String topic, String group) {
+ return topic + TOPIC_GROUP_SEPARATOR + group;
+ }
+
+ /**
+ * @param group group of users
+ * @return the groupId used to commit offset
+ */
+ private static String broadcastGroupId(String group) {
+ return group + TOPIC_GROUP_SEPARATOR + "broadcast";
+ }
+
+ @Override
+ public String getServiceName() {
+ return "BroadcastOffsetManager";
+ }
+
+ @Override
+ public void run() {
+ while (!this.isStopped()) {
+ this.waitForRunning(Duration.ofSeconds(5).toMillis());
+ }
+ }
+
+ @Override
+ protected void onWaitEnd() {
+ this.scanOffsetData();
+ }
+
+ public static class BroadcastOffsetData {
+ private final String topic;
+ private final String group;
+ private final ConcurrentHashMap<String /* clientId */,
BroadcastTimedOffsetStore> clientOffsetStore;
+
+ public BroadcastOffsetData(String topic, String group) {
+ this.topic = topic;
+ this.group = group;
+ this.clientOffsetStore = new ConcurrentHashMap<>();
+ }
+ }
+
+ public static class BroadcastTimedOffsetStore {
+
+ /**
+ * the timeStamp of last update occurred
+ */
+ private volatile long timestamp;
+
+ /**
+ * mark the offset of this client is updated by proxy or not
+ */
+ private volatile boolean fromProxy;
+
+ /**
+ * the pulled offset of each queue
+ */
+ private final BroadcastOffsetStore offsetStore;
+
+ public BroadcastTimedOffsetStore(boolean fromProxy) {
+ this.timestamp = System.currentTimeMillis();
+ this.fromProxy = fromProxy;
+ this.offsetStore = new BroadcastOffsetStore();
+ }
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStore.java
new file mode 100644
index 000000000..3770e576a
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStore.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.MixAll;
+
+public class BroadcastOffsetStore {
+
+ private final ConcurrentMap<Integer, AtomicLong> offsetTable = new
ConcurrentHashMap<>();
+
+ public void updateOffset(int queueId, long offset, boolean increaseOnly) {
+ AtomicLong offsetOld = this.offsetTable.get(queueId);
+ if (null == offsetOld) {
+ offsetOld = this.offsetTable.putIfAbsent(queueId, new
AtomicLong(offset));
+ }
+
+ if (null != offsetOld) {
+ if (increaseOnly) {
+ MixAll.compareAndIncreaseOnly(offsetOld, offset);
+ } else {
+ offsetOld.set(offset);
+ }
+ }
+ }
+
+ public long readOffset(int queueId) {
+ AtomicLong offset = this.offsetTable.get(queueId);
+ if (offset != null) {
+ return offset.get();
+ }
+ return -1L;
+ }
+
+ public Set<Integer> queueList() {
+ return offsetTable.keySet();
+ }
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
index ac6fa88bc..2d15139d4 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java
@@ -71,6 +71,10 @@ public class DefaultPullMessageResultHandler implements
PullMessageResultHandler
final MessageFilter messageFilter,
RemotingCommand response) {
+ PullMessageProcessor processor =
brokerController.getPullMessageProcessor();
+ processor.updateBroadcastPulledOffset(requestHeader.getTopic(),
requestHeader.getConsumerGroup(),
+ requestHeader.getQueueId(), requestHeader, channel, response,
getMessageResult.getNextBeginOffset());
+
final PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.readCustomHeader();
switch (response.getCode()) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 700ce55d7..e3a818953 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -22,7 +22,9 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
+import java.util.Objects;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -42,9 +44,11 @@ import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.ForbiddenType;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.RequestSource;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.rpc.RpcClientUtils;
@@ -480,8 +484,15 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
getMessageResult.setMaxOffset(messageStore.getMaxOffsetInQueue(topic, queueId));
getMessageResult.setSuggestPullingFromSlave(false);
} else {
- getMessageResult = messageStore.getMessage(
- group, topic, queueId, requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(), messageFilter);
+ long broadcastInitOffset = queryBroadcastPullInitOffset(topic,
group, queueId, requestHeader, channel);
+ if (broadcastInitOffset >= 0) {
+ getMessageResult = new GetMessageResult();
+ getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
+ getMessageResult.setNextBeginOffset(broadcastInitOffset);
+ } else {
+ getMessageResult = messageStore.getMessage(
+ group, topic, queueId, requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(), messageFilter);
+ }
}
if (getMessageResult != null) {
@@ -736,4 +747,74 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
public void setPullMessageResultHandler(PullMessageResultHandler
pullMessageResultHandler) {
this.pullMessageResultHandler = pullMessageResultHandler;
}
+
+ private boolean isBroadcast(boolean proxyPullBroadcast, ConsumerGroupInfo
consumerGroupInfo) {
+ return proxyPullBroadcast ||
+ consumerGroupInfo != null
+ &&
MessageModel.BROADCASTING.equals(consumerGroupInfo.getMessageModel())
+ &&
ConsumeType.CONSUME_PASSIVELY.equals(consumerGroupInfo.getConsumeType());
+ }
+
+ protected void updateBroadcastPulledOffset(String topic, String group, int
queueId,
+ PullMessageRequestHeader requestHeader, Channel channel,
RemotingCommand response, long nextBeginOffset) {
+
+ if (response == null ||
!this.brokerController.getBrokerConfig().isEnableBroadcastOffsetStore()) {
+ return;
+ }
+
+ boolean proxyPullBroadcast = Objects.equals(
+ RequestSource.PROXY_FOR_BROADCAST.getValue(),
requestHeader.getRequestSource());
+ ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+
+ if (isBroadcast(proxyPullBroadcast, consumerGroupInfo)) {
+ long offset = requestHeader.getQueueOffset();
+ if (ResponseCode.PULL_OFFSET_MOVED == response.getCode()) {
+ offset = nextBeginOffset;
+ }
+ String clientId;
+ if (proxyPullBroadcast) {
+ clientId = requestHeader.getProxyFrowardClientId();
+ } else {
+ ClientChannelInfo clientChannelInfo =
consumerGroupInfo.findChannel(channel);
+ if (clientChannelInfo == null) {
+ return;
+ }
+ clientId = clientChannelInfo.getClientId();
+ }
+ this.brokerController.getBroadcastOffsetManager()
+ .updateOffset(topic, group, queueId, offset, clientId,
proxyPullBroadcast);
+ }
+ }
+
+ /**
+ * When pull request is not broadcast or not return -1
+ */
+ protected long queryBroadcastPullInitOffset(String topic, String group,
int queueId,
+ PullMessageRequestHeader requestHeader, Channel channel) {
+
+ if
(!this.brokerController.getBrokerConfig().isEnableBroadcastOffsetStore()) {
+ return -1L;
+ }
+
+ ConsumerGroupInfo consumerGroupInfo =
this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+ boolean proxyPullBroadcast = Objects.equals(
+ RequestSource.PROXY_FOR_BROADCAST.getValue(),
requestHeader.getRequestSource());
+
+ if (isBroadcast(proxyPullBroadcast, consumerGroupInfo)) {
+ String clientId;
+ if (proxyPullBroadcast) {
+ clientId = requestHeader.getProxyFrowardClientId();
+ } else {
+ ClientChannelInfo clientChannelInfo =
consumerGroupInfo.findChannel(channel);
+ if (clientChannelInfo == null) {
+ return -1;
+ }
+ clientId = clientChannelInfo.getClientId();
+ }
+
+ return this.brokerController.getBroadcastOffsetManager()
+ .queryInitOffset(topic, group, queueId, clientId,
requestHeader.getQueueOffset(), proxyPullBroadcast);
+ }
+ return -1L;
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManagerTest.java
new file mode 100644
index 000000000..9dc00f9d6
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetManagerTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.store.MessageStore;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BroadcastOffsetManagerTest {
+
+ private final AtomicLong maxOffset = new AtomicLong(10L);
+ private final AtomicLong commitOffset = new AtomicLong(-1);
+
+ private final ConsumerOffsetManager consumerOffsetManager =
mock(ConsumerOffsetManager.class);
+ private final ConsumerManager consumerManager =
mock(ConsumerManager.class);
+ private final BrokerConfig brokerConfig = new BrokerConfig();
+ private final Set<String> onlineClientIdSet = new HashSet<>();
+ private BroadcastOffsetManager broadcastOffsetManager;
+
+ @Before
+ public void before() {
+ brokerConfig.setEnableBroadcastOffsetStore(true);
+ brokerConfig.setBroadcastOffsetExpireSecond(1);
+ brokerConfig.setBroadcastOffsetExpireMaxSecond(5);
+ BrokerController brokerController = mock(BrokerController.class);
+ when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
+
+
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+ doAnswer((Answer<ClientChannelInfo>) mock -> {
+ String clientId = mock.getArgument(1);
+ if (onlineClientIdSet.contains(clientId)) {
+ return new ClientChannelInfo(null);
+ }
+ return null;
+ }).when(consumerManager).findChannel(anyString(), anyString());
+
+ doAnswer((Answer<Long>) mock -> commitOffset.get())
+ .when(consumerOffsetManager).queryOffset(anyString(), anyString(),
anyInt());
+ doAnswer((Answer<Void>) mock -> {
+ commitOffset.set(mock.getArgument(4));
+ return null;
+ }).when(consumerOffsetManager).commitOffset(anyString(), anyString(),
anyString(), anyInt(), anyLong());
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+
+ MessageStore messageStore = mock(MessageStore.class);
+ doAnswer((Answer<Long>) mock -> maxOffset.get())
+ .when(messageStore).getMaxOffsetInQueue(anyString(), anyInt(),
anyBoolean());
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+
+ broadcastOffsetManager = new BroadcastOffsetManager(brokerController);
+ }
+
+ @Test
+ public void testBroadcastOffsetSwitch() {
+ // client1 connect to broker
+ onlineClientIdSet.add("client1");
+ long offset = broadcastOffsetManager.queryInitOffset("group", "topic",
0, "client1", 0, false);
+ Assert.assertEquals(-1, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 10,
"client1", false);
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client1", 11, false);
+ Assert.assertEquals(-1, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 11,
"client1", false);
+
+ // client1 connect to proxy
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client1", -1, true);
+ Assert.assertEquals(11, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 11,
"client1", true);
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client1", 11, true);
+ Assert.assertEquals(-1, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 12,
"client1", true);
+
+ broadcastOffsetManager.scanOffsetData();
+ Assert.assertEquals(12L, commitOffset.get());
+
+ // client2 connect to proxy
+ onlineClientIdSet.add("client2");
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client2", -1, true);
+ Assert.assertEquals(12, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 12,
"client2", true);
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client2", 11, true);
+ Assert.assertEquals(-1, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 13,
"client2", true);
+
+ broadcastOffsetManager.scanOffsetData();
+ Assert.assertEquals(12L, commitOffset.get());
+
+ // client1 connect to broker
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client1", 20, false);
+ Assert.assertEquals(12, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 12,
"client1", false);
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client1", 12, false);
+ Assert.assertEquals(-1, offset);
+
+ onlineClientIdSet.clear();
+
+ maxOffset.set(30L);
+
+ // client3 connect to broker
+ onlineClientIdSet.add("client3");
+ offset = broadcastOffsetManager.queryInitOffset("group", "topic", 0,
"client3", 30, false);
+ Assert.assertEquals(-1, offset);
+ broadcastOffsetManager.updateOffset("group", "topic", 0, 30,
"client3", false);
+
+
await().atMost(Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()
+ 1)).until(() -> {
+ broadcastOffsetManager.scanOffsetData();
+ return commitOffset.get() == 30L;
+ });
+ }
+
+ @Test
+ public void testBroadcastOffsetExpire() {
+ onlineClientIdSet.add("client1");
+ broadcastOffsetManager.updateOffset(
+ "group", "topic", 0, 10, "client1", false);
+ onlineClientIdSet.clear();
+
+
await().atMost(Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireSecond()
+ 1)).until(() -> {
+ broadcastOffsetManager.scanOffsetData();
+ return broadcastOffsetManager.offsetStoreMap.isEmpty();
+ });
+
+ onlineClientIdSet.add("client1");
+ broadcastOffsetManager.updateOffset(
+ "group", "topic", 0, 10, "client1", false);
+
await().atMost(Duration.ofSeconds(brokerConfig.getBroadcastOffsetExpireMaxSecond()
+ 1)).until(() -> {
+ broadcastOffsetManager.scanOffsetData();
+ return broadcastOffsetManager.offsetStoreMap.isEmpty();
+ });
+ }
+}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStoreTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStoreTest.java
new file mode 100644
index 000000000..ef830b9e9
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/BroadcastOffsetStoreTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BroadcastOffsetStoreTest {
+
+ @Test
+ public void testBasicOffsetStore() {
+ BroadcastOffsetStore offsetStore = new BroadcastOffsetStore();
+ offsetStore.updateOffset(0, 100L, false);
+ offsetStore.updateOffset(1, 200L, false);
+ Assert.assertEquals(100L, offsetStore.readOffset(0));
+ }
+}
\ No newline at end of file
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index e20acb0cf..2398fee87 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -18,8 +18,10 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Method;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -42,6 +44,7 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -206,6 +209,25 @@ public class PullMessageProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
}
+ @Test
+ public void testIfBroadcast() throws Exception {
+ Class<? extends PullMessageProcessor> clazz =
pullMessageProcessor.getClass();
+ Method method = clazz.getDeclaredMethod("isBroadcast", boolean.class,
ConsumerGroupInfo.class);
+ method.setAccessible(true);
+
+ ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo("GID-1",
+ ConsumeType.CONSUME_PASSIVELY, MessageModel.CLUSTERING,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ Assert.assertTrue((Boolean) method.invoke(pullMessageProcessor, true,
consumerGroupInfo));
+
+ ConsumerGroupInfo consumerGroupInfo2 = new ConsumerGroupInfo("GID-2",
+ ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ Assert.assertFalse((Boolean) method.invoke(pullMessageProcessor,
false, consumerGroupInfo2));
+
+ ConsumerGroupInfo consumerGroupInfo3 = new ConsumerGroupInfo("GID-3",
+ ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ Assert.assertTrue((Boolean) method.invoke(pullMessageProcessor, false,
consumerGroupInfo3));
+ }
+
private RemotingCommand createPullMsgCommand(int requestCode) {
PullMessageRequestHeader requestHeader = new
PullMessageRequestHeader();
requestHeader.setCommitOffset(123L);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f39741e26..677b87ff5 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -202,6 +202,12 @@ public class BrokerConfig extends BrokerIdentity {
private boolean enableNetWorkFlowControl = false;
+ private boolean enableBroadcastOffsetStore = true;
+
+ private long broadcastOffsetExpireSecond = 2 * 60;
+
+ private long broadcastOffsetExpireMaxSecond = 5 * 60;
+
private int popPollingSize = 1024;
private int popPollingMapSize = 100000;
// 20w cost 200M heap memory.
@@ -1421,6 +1427,30 @@ public class BrokerConfig extends BrokerIdentity {
this.useServerSideResetOffset = useServerSideResetOffset;
}
+ public boolean isEnableBroadcastOffsetStore() {
+ return enableBroadcastOffsetStore;
+ }
+
+ public void setEnableBroadcastOffsetStore(boolean
enableBroadcastOffsetStore) {
+ this.enableBroadcastOffsetStore = enableBroadcastOffsetStore;
+ }
+
+ public long getBroadcastOffsetExpireSecond() {
+ return broadcastOffsetExpireSecond;
+ }
+
+ public void setBroadcastOffsetExpireSecond(long
broadcastOffsetExpireSecond) {
+ this.broadcastOffsetExpireSecond = broadcastOffsetExpireSecond;
+ }
+
+ public long getBroadcastOffsetExpireMaxSecond() {
+ return broadcastOffsetExpireMaxSecond;
+ }
+
+ public void setBroadcastOffsetExpireMaxSecond(long
broadcastOffsetExpireMaxSecond) {
+ this.broadcastOffsetExpireMaxSecond = broadcastOffsetExpireMaxSecond;
+ }
+
public MetricsExporterType getMetricsExporterType() {
return metricsExporterType;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestSource.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestSource.java
new file mode 100644
index 000000000..ebe61c2aa
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestSource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol;
+
+public enum RequestSource {
+
+ SDK(-1),
+ PROXY_FOR_ORDER(0),
+ PROXY_FOR_BROADCAST(1),
+ PROXY_FOR_STREAM(2);
+
+ public static final String SYSTEM_PROPERTY_KEY = "rocketmq.requestSource";
+ private final int value;
+
+ RequestSource(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static boolean isValid(Integer value) {
+ return null != value && value >= -1 && value <
RequestSource.values().length - 1;
+ }
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 317dc5f4e..751cb8ea3 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.remoting.protocol.FastCodesHeader;
import io.netty.buffer.ByteBuf;
public class PullMessageRequestHeader extends TopicQueueRequestHeader
implements FastCodesHeader {
+
@CFNotNull
private String consumerGroup;
@CFNotNull
@@ -56,6 +57,16 @@ public class PullMessageRequestHeader extends
TopicQueueRequestHeader implements
@CFNullable
private Integer maxMsgBytes;
+ /**
+ * mark the source of this pull request
+ */
+ private Integer requestSource;
+
+ /**
+ * the real clientId when request from proxy
+ */
+ private String proxyFrowardClientId;
+
@Override
public void checkFields() throws RemotingCommandException {
}
@@ -74,6 +85,8 @@ public class PullMessageRequestHeader extends
TopicQueueRequestHeader implements
writeIfNotNull(out, "subVersion", subVersion);
writeIfNotNull(out, "expressionType", expressionType);
writeIfNotNull(out, "maxMsgBytes", maxMsgBytes);
+ writeIfNotNull(out, "requestSource", requestSource);
+ writeIfNotNull(out, "proxyFrowardClientId", proxyFrowardClientId);
writeIfNotNull(out, "lo", lo);
writeIfNotNull(out, "ns", ns);
writeIfNotNull(out, "nsd", nsd);
@@ -143,6 +156,16 @@ public class PullMessageRequestHeader extends
TopicQueueRequestHeader implements
this.maxMsgBytes = Integer.parseInt(str);
}
+ str = fields.get("requestSource");
+ if (str != null) {
+ this.requestSource = Integer.parseInt(str);
+ }
+
+ str = fields.get("proxyFrowardClientId");
+ if (str != null) {
+ this.proxyFrowardClientId = str;
+ }
+
str = fields.get("lo");
if (str != null) {
this.lo = Boolean.parseBoolean(str);
@@ -269,6 +292,22 @@ public class PullMessageRequestHeader extends
TopicQueueRequestHeader implements
this.maxMsgBytes = maxMsgBytes;
}
+ public Integer getRequestSource() {
+ return requestSource;
+ }
+
+ public void setRequestSource(Integer requestSource) {
+ this.requestSource = requestSource;
+ }
+
+ public String getProxyFrowardClientId() {
+ return proxyFrowardClientId;
+ }
+
+ public void setProxyFrowardClientId(String proxyFrowardClientId) {
+ this.proxyFrowardClientId = proxyFrowardClientId;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -284,6 +323,8 @@ public class PullMessageRequestHeader extends
TopicQueueRequestHeader implements
.add("subscription", subscription)
.add("subVersion", subVersion)
.add("expressionType", expressionType)
+ .add("requestSource", requestSource)
+ .add("proxyFrowardClientId", proxyFrowardClientId)
.toString();
}
}