ni-ze commented on code in PR #6531:
URL: https://github.com/apache/rocketmq/pull/6531#discussion_r1156620877
##########
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java:
##########
@@ -542,11 +542,12 @@ public ConcurrentMap<String, TopicConfig>
getTopicConfigs() {
}
public Optional<TopicConfig> getTopicConfig(String topic) {
- if (this.topicConfigTable == null) {
+ TopicConfig config = this.getConfigFunc.apply(topic);
+ if (config == null) {
Review Comment:
The field getConfigFunc maybe empty.
##########
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java:
##########
@@ -737,8 +737,7 @@ public boolean initialize() throws
CloneNotSupportedException {
if (result) {
try {
- DefaultMessageStore defaultMessageStore = new
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig);
-
defaultMessageStore.setTopicConfigTable(topicConfigManager.getTopicConfigTable());
+ DefaultMessageStore defaultMessageStore = new
DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager,
this.messageArrivingListener, this.brokerConfig, (ConcurrentMap<String,
TopicConfig>)
Collections.unmodifiableMap(topicConfigManager.getTopicConfigTable()));
Review Comment:
UnmodifiableMap can not convert to ConcurrentMap.
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -204,7 +205,9 @@ public class DefaultMessageStore implements MessageStore {
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("StoreCleanQueueScheduledThread"));
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig,
final BrokerStatsManager brokerStatsManager,
- final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig) throws IOException {
+ final MessageArrivingListener messageArrivingListener, final
BrokerConfig brokerConfig,
+ final Function<String, TopicConfig> getConfigFunc,
+ final Function<?, Map<String, TopicConfig>> getTopicConfigTable)
throws IOException {
Review Comment:
IMO, It is not necessary to use two Function, topicConfigTable is fine here,
and make topicConfigTable as a field in DefaultMessageStore, getTopicConfigs
and getTopicConfig(String) method use this field to find what they need instead
of pass them into consumeQueueStore.
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2049,18 +2052,16 @@ public void assignOffset(MessageExtBrokerInner msg,
short messageNum) {
}
}
- @Override
public ConcurrentMap<String, TopicConfig> getTopicConfigs() {
return this.consumeQueueStore.getTopicConfigs();
}
- @Override
public Optional<TopicConfig> getTopicConfig(String topic) {
return this.consumeQueueStore.getTopicConfig(topic);
}
public void setTopicConfigTable(ConcurrentMap<String, TopicConfig>
topicConfigTable) {
- this.consumeQueueStore.setTopicConfigTable(topicConfigTable);
+ this.consumeQueueStore.setTopicConfigFunction(topic ->
topicConfigTable != null ? topicConfigTable.get(topic) : null);
Review Comment:
this method can be removed, because the place reference to it is deleted in
BrokerController.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]