This is an automated email from the ASF dual-hosted git repository.
dongyuanpan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9ce83452a6 [ISSUE #9105] Fix the issue of duplicate consumption in LMQ
(#9101)
9ce83452a6 is described below
commit 9ce83452a62f3fb910454bab92c092c83d561bdb
Author: rongtong <[email protected]>
AuthorDate: Mon Jan 6 10:51:58 2025 +0800
[ISSUE #9105] Fix the issue of duplicate consumption in LMQ (#9101)
* Fix the issue of duplicate consumption in LMQ
* Pass the checkstyle
* Pass the UTs
* Pass the check style
---
.../broker/longpolling/PopLongPollingService.java | 17 ++++-----
.../broker/offset/ConsumerOrderInfoManager.java | 2 +-
.../broker/processor/AdminBrokerProcessor.java | 6 ++--
.../broker/processor/PopBufferMergeService.java | 6 ++--
.../longpolling/PopLongPollingServiceTest.java | 42 +++++++++++-----------
.../offset/ConsumerOrderInfoManagerTest.java | 6 +---
6 files changed, 39 insertions(+), 40 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index 91185fbe94..e87a8e803f 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread {
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
private final NettyRequestProcessor processor;
- private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>>
topicCidMap;
+ private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String,
Byte>> topicCidMap;
private final ConcurrentLinkedHashMap<String,
ConcurrentSkipListSet<PopRequest>> pollingMap;
private long lastCleanTime = 0;
@@ -63,7 +63,8 @@ public class PopLongPollingService extends ServiceThread {
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
- this.topicCidMap = new
ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
+ this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentHashMap<String, Byte>>()
+
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()
* 2L).build();
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
this.notifyLast = notifyLast;
@@ -350,7 +351,7 @@ public class PopLongPollingService extends ServiceThread {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry =
topicCidMapIter.next();
String topic = entry.getKey();
if
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
- POP_LOGGER.info("remove not exit topic {} in
topicCidMap!", topic);
+ POP_LOGGER.info("remove nonexistent topic {} in
topicCidMap!", topic);
topicCidMapIter.remove();
continue;
}
@@ -358,8 +359,8 @@ public class PopLongPollingService extends ServiceThread {
while (cidMapIter.hasNext()) {
Map.Entry<String, Byte> cidEntry = cidMapIter.next();
String cid = cidEntry.getKey();
- if
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
{
- POP_LOGGER.info("remove not exit sub {} of topic
{} in topicCidMap!", cid, topic);
+ if
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
{
+ POP_LOGGER.info("remove nonexistent subscription
group {} of topic {} in topicCidMap!", cid, topic);
cidMapIter.remove();
}
}
@@ -380,12 +381,12 @@ public class PopLongPollingService extends ServiceThread {
String topic = keyArray[0];
String cid = keyArray[1];
if
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
- POP_LOGGER.info("remove not exit topic {} in
pollingMap!", topic);
+ POP_LOGGER.info("remove nonexistent topic {} in
pollingMap!", topic);
pollingMapIter.remove();
continue;
}
- if
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
{
- POP_LOGGER.info("remove not exit sub {} of topic {} in
pollingMap!", cid, topic);
+ if
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
{
+ POP_LOGGER.info("remove nonexistent subscription group
{} of topic {} in pollingMap!", cid, topic);
pollingMapIter.remove();
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 4eccc6c037..120f5b104c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -281,7 +281,7 @@ public class ConsumerOrderInfoManager extends ConfigManager
{
continue;
}
- if
(this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group)
== null) {
+ if
(!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group))
{
iterator.remove();
log.info("Group not exist, Clean order info, {}:{}",
topicAtGroup, qs);
continue;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6bcf9aaa0f..6fb7584aa9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -424,7 +424,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
GetSubscriptionGroupConfigRequestHeader requestHeader =
(GetSubscriptionGroupConfigRequestHeader)
request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class);
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- SubscriptionGroupConfig groupConfig =
this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup());
+ SubscriptionGroupConfig groupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (groupConfig == null) {
LOGGER.error("No group in this broker, client: {} group: {}",
ctx.channel().remoteAddress(), requestHeader.getGroup());
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
@@ -2444,7 +2444,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
// groupSysFlag
if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) {
- SubscriptionGroupConfig groupConfig =
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup());
+ SubscriptionGroupConfig groupConfig =
brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (groupConfig != null) {
request.addExtField("groupSysFlag",
String.valueOf(groupConfig.getGroupSysFlag()));
}
@@ -2933,7 +2933,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
GetTopicConfigRequestHeader requestHeader =
(GetTopicConfigRequestHeader)
request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
- TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
+ TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (topicConfig == null) {
LOGGER.error("No topic in this broker, client: {} topic: {}",
ctx.channel().remoteAddress(), requestHeader.getTopic());
//be care of the response code, should set "not-exist" explicitly
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 05a92c54b1..820388b18d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -197,12 +197,12 @@ public class PopBufferMergeService extends ServiceThread {
String topic = keyArray[0];
String cid = keyArray[1];
if
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
- POP_LOGGER.info("[PopBuffer]remove not exit topic {} in
buffer!", topic);
+ POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in
buffer!", topic);
iterator.remove();
continue;
}
- if
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
{
- POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {}
in buffer!", cid, topic);
+ if
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
{
+ POP_LOGGER.info("[PopBuffer]remove nonexistent subscription
group {} of topic {} in buffer!", cid, topic);
iterator.remove();
continue;
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 1f064ec05d..003bf09842 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -55,20 +55,20 @@ public class PopLongPollingServiceTest {
@Mock
private BrokerController brokerController;
-
+
@Mock
private NettyRequestProcessor processor;
-
+
@Mock
private ChannelHandlerContext ctx;
-
+
@Mock
private ExecutorService pullMessageExecutor;
-
+
private PopLongPollingService popLongPollingService;
-
+
private final String defaultTopic = "defaultTopic";
-
+
@Before
public void init() {
BrokerConfig brokerConfig = new BrokerConfig();
@@ -76,7 +76,7 @@ public class PopLongPollingServiceTest {
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
popLongPollingService = spy(new
PopLongPollingService(brokerController, processor, true));
}
-
+
@Test
public void testNotifyMessageArrivingWithRetryTopic() {
int queueId = 0;
@@ -84,31 +84,32 @@ public class PopLongPollingServiceTest {
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic,
queueId);
verify(popLongPollingService,
times(1)).notifyMessageArrivingWithRetryTopic(defaultTopic, queueId, -1L, null,
0L, null, null);
}
-
+
@Test
public void testNotifyMessageArriving() {
int queueId = 0;
Long tagsCode = 123L;
long offset = 123L;
long msgStoreTime = System.currentTimeMillis();
- byte[] filterBitMap = new byte[]{0x01};
+ byte[] filterBitMap = new byte[] {0x01};
Map<String, String> properties = new ConcurrentHashMap<>();
doNothing().when(popLongPollingService).notifyMessageArriving(defaultTopic,
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
popLongPollingService.notifyMessageArrivingWithRetryTopic(defaultTopic,
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
verify(popLongPollingService).notifyMessageArriving(defaultTopic,
queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties);
}
-
+
@Test
public void testNotifyMessageArrivingValidRequest() throws Exception {
String cid = "CID_1";
int queueId = 0;
- ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap
= new ConcurrentHashMap<>();
+ ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>>
topicCidMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentHashMap<String, Byte>>()
+ .maximumWeightedCapacity(10).build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
popLongPollingService = new PopLongPollingService(brokerController,
processor, true);
ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>>
pollingMap =
- new ConcurrentLinkedHashMap.Builder<String,
ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+ new ConcurrentLinkedHashMap.Builder<String,
ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
Channel channel = mock(Channel.class);
when(channel.isActive()).thenReturn(true);
PopRequest popRequest = mock(PopRequest.class);
@@ -126,19 +127,19 @@ public class PopLongPollingServiceTest {
boolean actual =
popLongPollingService.notifyMessageArriving(defaultTopic, queueId, cid, null,
0, null, null);
assertFalse(actual);
}
-
+
@Test
public void testWakeUpNullRequest() {
assertFalse(popLongPollingService.wakeUp(null));
}
-
+
@Test
public void testWakeUpIncompleteRequest() {
PopRequest request = mock(PopRequest.class);
when(request.complete()).thenReturn(false);
assertFalse(popLongPollingService.wakeUp(request));
}
-
+
@Test
public void testWakeUpInactiveChannel() {
PopRequest request = mock(PopRequest.class);
@@ -150,7 +151,7 @@ public class PopLongPollingServiceTest {
when(brokerController.getPullMessageExecutor()).thenReturn(pullMessageExecutor);
assertTrue(popLongPollingService.wakeUp(request));
}
-
+
@Test
public void testWakeUpValidRequestWithException() throws Exception {
PopRequest request = mock(PopRequest.class);
@@ -168,7 +169,7 @@ public class PopLongPollingServiceTest {
captor.getValue().run();
verify(processor).processRequest(any(), any());
}
-
+
@Test
public void testPollingNotPolling() {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
@@ -180,7 +181,7 @@ public class PopLongPollingServiceTest {
PollingResult result = popLongPollingService.polling(ctx,
remotingCommand, requestHeader, subscriptionData, messageFilter);
assertEquals(PollingResult.NOT_POLLING, result);
}
-
+
@Test
public void testPollingServicePollingTimeout() throws
IllegalAccessException {
String cid = "CID_1";
@@ -194,7 +195,8 @@ public class PopLongPollingServiceTest {
when(requestHeader.getPollTime()).thenReturn(1000L);
when(requestHeader.getTopic()).thenReturn(defaultTopic);
when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
- ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap
= new ConcurrentHashMap<>();
+ ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>>
topicCidMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentHashMap<String, Byte>>()
+ .maximumWeightedCapacity(10).build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
@@ -202,7 +204,7 @@ public class PopLongPollingServiceTest {
PollingResult result = popLongPollingService.polling(ctx,
remotingCommand, requestHeader, subscriptionData, messageFilter);
assertEquals(PollingResult.POLLING_TIMEOUT, result);
}
-
+
@Test
public void testPollingPollingSuc() {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
index 25b418c934..4414eda54e 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -21,7 +21,6 @@ import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -29,7 +28,6 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
-import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
@@ -384,9 +382,7 @@ public class ConsumerOrderInfoManagerTest {
SubscriptionGroupManager subscriptionGroupManager =
mock(SubscriptionGroupManager.class);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
- ConcurrentMap<String, SubscriptionGroupConfig>
subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>();
- subscriptionGroupConfigConcurrentMap.put(GROUP, new
SubscriptionGroupConfig());
-
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap);
+
when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true);
TopicConfig topicConfig = new TopicConfig(TOPIC);
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);