This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new da9b26bd2d [ISSUE #10438] Fix buildTopicConfigSerializeWrapper
exposing live topicConfigTable reference (#10439)
da9b26bd2d is described below
commit da9b26bd2d715e57d0ae13cadc9c8765d01413cb
Author: Chuan <[email protected]>
AuthorDate: Thu Jun 11 14:52:43 2026 +0800
[ISSUE #10438] Fix buildTopicConfigSerializeWrapper exposing live
topicConfigTable reference (#10439)
---
.../rocketmq/broker/topic/TopicConfigManager.java | 2 +-
.../broker/topic/TopicConfigManagerTest.java | 36 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index b481242b12..488cedef79 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -628,10 +628,10 @@ public class TopicConfigManager extends ConfigManager {
public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new
TopicConfigSerializeWrapper();
- topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
DataVersion dataVersionCopy = new DataVersion();
dataVersionCopy.assignNewOne(this.dataVersion);
topicConfigSerializeWrapper.setDataVersion(dataVersionCopy);
+ topicConfigSerializeWrapper.setTopicConfigTable(new
ConcurrentHashMap<>(this.topicConfigTable));
return topicConfigSerializeWrapper;
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index af1066a4d0..085c3f9cfb 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,6 +40,7 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import
org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
@@ -403,7 +405,7 @@ public class TopicConfigManagerTest {
}
@Test
- public void
testBuildSerializeWrapperUpdatesDataVersionWhenSplitRegistrationEnabled() {
+ public void testSplitRegistrationBumpsVersion() {
brokerController.getBrokerConfig().setEnableSplitRegistration(true);
long counterBefore =
topicConfigManager.getDataVersion().getCounter().get();
@@ -414,4 +416,36 @@ public class TopicConfigManagerTest {
Assert.assertEquals(counterBefore + 1, counterAfter);
Assert.assertEquals(counterAfter,
wrapper.getDataVersion().getCounter().get());
}
+
+ @Test
+ public void testSerializeWrapperIsSnapshot() {
+ topicConfigManager.getTopicConfigTable().clear();
+
+ for (int i = 0; i < 10; i++) {
+ TopicConfig tc = new TopicConfig("BaseTopic-" + i, 4, 4,
+ PermName.PERM_READ | PermName.PERM_WRITE);
+ topicConfigManager.getTopicConfigTable().put(tc.getTopicName(),
tc);
+ }
+
+ TopicConfigSerializeWrapper wrapper =
topicConfigManager.buildTopicConfigSerializeWrapper();
+ ConcurrentMap<String, TopicConfig> wrapperTable =
wrapper.getTopicConfigTable();
+ int sizeAtBuild = wrapperTable.size();
+ long versionAtBuild = wrapper.getDataVersion().getCounter().get();
+
+ // Wrapper should hold an independent snapshot, not the live reference
+ Assert.assertNotSame(topicConfigManager.getTopicConfigTable(),
wrapperTable);
+
+ // Simulate a concurrent topic update after the wrapper was built
+ TopicConfig newTopic = new TopicConfig("NewTopic-AfterBuild", 8, 8,
+ PermName.PERM_READ | PermName.PERM_WRITE);
+ topicConfigManager.getTopicConfigTable().put(newTopic.getTopicName(),
newTopic);
+ topicConfigManager.updateDataVersion();
+
+ // The wrapper's table should NOT see the new topic
+ Assert.assertFalse(wrapperTable.containsKey("NewTopic-AfterBuild"));
+ Assert.assertEquals(sizeAtBuild, wrapperTable.size());
+
+ // The wrapper's version should reflect a consistent point-in-time
snapshot
+ Assert.assertEquals(versionAtBuild,
wrapper.getDataVersion().getCounter().get());
+ }
}