This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f6249e5b3a [ISSEU #6426] Fix slave broker SubscriptionGroupConfig and
MessageRequestMode updating atomically (#8983)
f6249e5b3a is described below
commit f6249e5b3a171be1cd0051732be77fd55ee4eb97
Author: Aurora Twinkle <[email protected]>
AuthorDate: Mon Jan 13 14:45:20 2025 +0800
[ISSEU #6426] Fix slave broker SubscriptionGroupConfig and
MessageRequestMode updating atomically (#8983)
* fix[slave]:Make SubscriptionGroupConfig and MessageRequestMode updating
atomically
* add unit test
* fix ut
---------
Co-authored-by: duanlinlin <[email protected]>
---
.../rocketmq/broker/slave/SlaveSynchronize.java | 30 ++++-
.../broker/slave/SlaveSynchronizeAtomicTest.java | 141 +++++++++++++++++++++
2 files changed, 164 insertions(+), 7 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index aa77b773ee..bfb5c9dcd0 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.slave;
import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
@@ -30,8 +31,10 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import
org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper;
import
org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
+import
org.apache.rocketmq.remoting.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMetrics;
@@ -166,9 +169,16 @@ public class SlaveSynchronize {
this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(
subscriptionWrapper.getDataVersion());
-
subscriptionGroupManager.getSubscriptionGroupTable().clear();
-
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
- subscriptionWrapper.getSubscriptionGroupTable());
+
+ ConcurrentMap<String, SubscriptionGroupConfig>
curSubscriptionGroupTable =
+
subscriptionGroupManager.getSubscriptionGroupTable();
+ ConcurrentMap<String, SubscriptionGroupConfig>
newSubscriptionGroupTable =
+ subscriptionWrapper.getSubscriptionGroupTable();
+ // delete
+ curSubscriptionGroupTable.entrySet().removeIf(e ->
!newSubscriptionGroupTable.containsKey(e.getKey()));
+ // update
+
curSubscriptionGroupTable.putAll(newSubscriptionGroupTable);
+ // persist
subscriptionGroupManager.persist();
LOGGER.info("Update slave Subscription Group from master,
{}", masterAddrBak);
}
@@ -187,10 +197,16 @@ public class SlaveSynchronize {
MessageRequestModeManager messageRequestModeManager =
this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager();
- messageRequestModeManager.getMessageRequestModeMap().clear();
- messageRequestModeManager.getMessageRequestModeMap().putAll(
-
messageRequestModeSerializeWrapper.getMessageRequestModeMap()
- );
+ ConcurrentHashMap<String, ConcurrentHashMap<String,
SetMessageRequestModeRequestBody>> curMessageRequestModeMap =
+ messageRequestModeManager.getMessageRequestModeMap();
+ ConcurrentHashMap<String, ConcurrentHashMap<String,
SetMessageRequestModeRequestBody>> newMessageRequestModeMap =
+
messageRequestModeSerializeWrapper.getMessageRequestModeMap();
+
+ // delete
+ curMessageRequestModeMap.entrySet().removeIf(e ->
!newMessageRequestModeMap.containsKey(e.getKey()));
+ // update
+ curMessageRequestModeMap.putAll(newMessageRequestModeMap);
+ // persist
messageRequestModeManager.persist();
LOGGER.info("Update slave Message Request Mode from master,
{}", masterAddrBak);
} catch (Exception e) {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
new file mode 100644
index 0000000000..75db22e7e7
--- /dev/null
+++
b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeAtomicTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.slave;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import
org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class SlaveSynchronizeAtomicTest {
+ @Spy
+ private BrokerController brokerController =
+ new BrokerController(new BrokerConfig(), new NettyServerConfig(),
new NettyClientConfig(),
+ new MessageStoreConfig());
+
+ private SlaveSynchronize slaveSynchronize;
+
+ @Mock
+ private BrokerOuterAPI brokerOuterAPI;
+
+ @Mock
+ private TopicConfigManager topicConfigManager;
+
+
+ @Mock
+ private SubscriptionGroupManager subscriptionGroupManager;
+
+ @Mock
+ private QueryAssignmentProcessor queryAssignmentProcessor;
+
+ @Mock
+ private MessageRequestModeManager messageRequestModeManager;
+
+
+ private static final String BROKER_ADDR = "127.0.0.1:10911";
+ private final SubscriptionGroupWrapper subscriptionGroupWrapper =
createSubscriptionGroupWrapper();
+ private final MessageRequestModeSerializeWrapper
requestModeSerializeWrapper = createMessageRequestModeWrapper();
+ private final DataVersion dataVersion = new DataVersion();
+
+ @Before
+ public void init() {
+ for (int i = 0; i < 100000; i++) {
+ subscriptionGroupWrapper.getSubscriptionGroupTable().put("group" +
i, new SubscriptionGroupConfig());
+ }
+ for (int i = 0; i < 100000; i++) {
+ requestModeSerializeWrapper.getMessageRequestModeMap().put("topic"
+ i, new ConcurrentHashMap<>());
+ }
+ when(brokerController.getBrokerOuterAPI()).thenReturn(brokerOuterAPI);
+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+
when(subscriptionGroupManager.getDataVersion()).thenReturn(dataVersion);
+ when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(
+ subscriptionGroupWrapper.getSubscriptionGroupTable());
+ slaveSynchronize = new SlaveSynchronize(brokerController);
+ slaveSynchronize.setMasterAddr(BROKER_ADDR);
+ }
+
+ private SubscriptionGroupWrapper createSubscriptionGroupWrapper() {
+ SubscriptionGroupWrapper wrapper = new SubscriptionGroupWrapper();
+ wrapper.setSubscriptionGroupTable(new ConcurrentHashMap<>());
+ DataVersion dataVersion = new DataVersion();
+ dataVersion.setStateVersion(1L);
+ wrapper.setDataVersion(dataVersion);
+ return wrapper;
+ }
+
+ private MessageRequestModeSerializeWrapper
createMessageRequestModeWrapper() {
+ MessageRequestModeSerializeWrapper wrapper = new
MessageRequestModeSerializeWrapper();
+ wrapper.setMessageRequestModeMap(new ConcurrentHashMap<>());
+ return wrapper;
+ }
+
+ @Test
+ public void testSyncAtomically()
+ throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, MQBrokerException,
+ InterruptedException {
+
when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(subscriptionGroupWrapper);
+
when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(requestModeSerializeWrapper);
+
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ new Thread(() -> {
+ while (countDownLatch.getCount() > 0) {
+ dataVersion.nextVersion();
+ try {
+ slaveSynchronize.syncAll();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }).start();
+
+ for (int i = 0; i < 10000000; i++) {
+
Assert.assertTrue(subscriptionGroupWrapper.getSubscriptionGroupTable()
+ .containsKey("group" +
ThreadLocalRandom.current().nextInt(0, 100000)));
+
Assert.assertTrue(requestModeSerializeWrapper.getMessageRequestModeMap()
+ .containsKey("topic" +
ThreadLocalRandom.current().nextInt(0, 100000)));
+ }
+ countDownLatch.countDown();
+ }
+}