ni-ze commented on code in PR #6531:
URL: https://github.com/apache/rocketmq/pull/6531#discussion_r1156620877


##########
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java:
##########
@@ -542,11 +542,12 @@ public ConcurrentMap<String, TopicConfig> 
getTopicConfigs() {
     }
 
     public Optional<TopicConfig> getTopicConfig(String topic) {
-        if (this.topicConfigTable == null) {
+        TopicConfig config = this.getConfigFunc.apply(topic);
+        if (config == null) {

Review Comment:
   The field getConfigFunc maybe empty.



##########
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java:
##########
@@ -737,8 +737,7 @@ public boolean initialize() throws 
CloneNotSupportedException {
 
         if (result) {
             try {
-                DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
this.messageArrivingListener, this.brokerConfig);
-                
defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable());
+                DefaultMessageStore defaultMessageStore = new 
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
this.messageArrivingListener, this.brokerConfig, (ConcurrentMap<String, 
TopicConfig>) 
Collections.unmodifiableMap(topicConfigManager.getTopicConfigTable()));

Review Comment:
   UnmodifiableMap can not convert to ConcurrentMap.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -204,7 +205,9 @@ public class DefaultMessageStore implements MessageStore {
         Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
 
     public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, 
final BrokerStatsManager brokerStatsManager,
-        final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig) throws IOException {
+        final MessageArrivingListener messageArrivingListener, final 
BrokerConfig brokerConfig,
+        final Function<String, TopicConfig> getConfigFunc,
+        final Function<?, Map<String, TopicConfig>> getTopicConfigTable) 
throws IOException {

Review Comment:
   IMO, It is not necessary to use two Function, topicConfigTable is fine here, 
and make topicConfigTable as a field in  DefaultMessageStore, getTopicConfigs 
and getTopicConfig(String) method use this field to find what they need instead 
of pass them into consumeQueueStore.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2049,18 +2052,16 @@ public void assignOffset(MessageExtBrokerInner msg, 
short messageNum) {
         }
     }
 
-    @Override
     public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
         return this.consumeQueueStore.getTopicConfigs();
     }
 
-    @Override
     public Optional<TopicConfig> getTopicConfig(String topic) {
         return this.consumeQueueStore.getTopicConfig(topic);
     }
 
     public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> 
topicConfigTable) {
-        this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
+        this.consumeQueueStore.setTopicConfigFunction(topic -> 
topicConfigTable != null ? topicConfigTable.get(topic) : null);

Review Comment:
   this method can be removed, because the place reference to it is deleted in 
BrokerController.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to