This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7e018520ef Add expireAfterAccess for cache (#7247)
7e018520ef is described below
commit 7e018520ef707a841c66c55d621f6560d03b631b
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Aug 25 09:49:22 2023 +0800
Add expireAfterAccess for cache (#7247)
Add expireAfterAccess for cache
---
.../apache/rocketmq/proxy/config/ProxyConfig.java | 59 ++++++++++++++++------
.../service/metadata/ClusterMetadataService.java | 6 ++-
.../proxy/service/route/TopicRouteService.java | 14 +++--
3 files changed, 56 insertions(+), 23 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 76a2439196..2994893d71 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -155,14 +155,17 @@ public class ProxyConfig implements ConfigFile {
private int consumerProcessorThreadPoolQueueCapacity = 10000;
private boolean useEndpointPortFromRequest = false;
- private int topicRouteServiceCacheExpiredInSeconds = 20;
+
+ private int topicRouteServiceCacheExpiredSeconds = 300;
+ private int topicRouteServiceCacheRefreshSeconds = 20;
private int topicRouteServiceCacheMaxNum = 20000;
private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER;
private int topicRouteServiceThreadPoolQueueCapacity = 5000;
-
- private int topicConfigCacheExpiredInSeconds = 20;
+ private int topicConfigCacheExpiredSeconds = 300;
+ private int topicConfigCacheRefreshSeconds = 20;
private int topicConfigCacheMaxNum = 20000;
- private int subscriptionGroupConfigCacheExpiredInSeconds = 20;
+ private int subscriptionGroupConfigCacheExpiredSeconds = 300;
+ private int subscriptionGroupConfigCacheRefreshSeconds = 20;
private int subscriptionGroupConfigCacheMaxNum = 20000;
private int metadataThreadPoolNums = 3;
private int metadataThreadPoolQueueCapacity = 100000;
@@ -794,12 +797,20 @@ public class ProxyConfig implements ConfigFile {
this.consumerProcessorThreadPoolQueueCapacity =
consumerProcessorThreadPoolQueueCapacity;
}
- public int getTopicRouteServiceCacheExpiredInSeconds() {
- return topicRouteServiceCacheExpiredInSeconds;
+ public int getTopicRouteServiceCacheExpiredSeconds() {
+ return topicRouteServiceCacheExpiredSeconds;
+ }
+
+ public void setTopicRouteServiceCacheExpiredSeconds(int
topicRouteServiceCacheExpiredSeconds) {
+ this.topicRouteServiceCacheExpiredSeconds =
topicRouteServiceCacheExpiredSeconds;
}
- public void setTopicRouteServiceCacheExpiredInSeconds(int
topicRouteServiceCacheExpiredInSeconds) {
- this.topicRouteServiceCacheExpiredInSeconds =
topicRouteServiceCacheExpiredInSeconds;
+ public int getTopicRouteServiceCacheRefreshSeconds() {
+ return topicRouteServiceCacheRefreshSeconds;
+ }
+
+ public void setTopicRouteServiceCacheRefreshSeconds(int
topicRouteServiceCacheRefreshSeconds) {
+ this.topicRouteServiceCacheRefreshSeconds =
topicRouteServiceCacheRefreshSeconds;
}
public int getTopicRouteServiceCacheMaxNum() {
@@ -826,12 +837,20 @@ public class ProxyConfig implements ConfigFile {
this.topicRouteServiceThreadPoolQueueCapacity =
topicRouteServiceThreadPoolQueueCapacity;
}
- public int getTopicConfigCacheExpiredInSeconds() {
- return topicConfigCacheExpiredInSeconds;
+ public int getTopicConfigCacheRefreshSeconds() {
+ return topicConfigCacheRefreshSeconds;
+ }
+
+ public void setTopicConfigCacheRefreshSeconds(int
topicConfigCacheRefreshSeconds) {
+ this.topicConfigCacheRefreshSeconds = topicConfigCacheRefreshSeconds;
+ }
+
+ public int getTopicConfigCacheExpiredSeconds() {
+ return topicConfigCacheExpiredSeconds;
}
- public void setTopicConfigCacheExpiredInSeconds(int
topicConfigCacheExpiredInSeconds) {
- this.topicConfigCacheExpiredInSeconds =
topicConfigCacheExpiredInSeconds;
+ public void setTopicConfigCacheExpiredSeconds(int
topicConfigCacheExpiredSeconds) {
+ this.topicConfigCacheExpiredSeconds = topicConfigCacheExpiredSeconds;
}
public int getTopicConfigCacheMaxNum() {
@@ -842,12 +861,20 @@ public class ProxyConfig implements ConfigFile {
this.topicConfigCacheMaxNum = topicConfigCacheMaxNum;
}
- public int getSubscriptionGroupConfigCacheExpiredInSeconds() {
- return subscriptionGroupConfigCacheExpiredInSeconds;
+ public int getSubscriptionGroupConfigCacheRefreshSeconds() {
+ return subscriptionGroupConfigCacheRefreshSeconds;
+ }
+
+ public void setSubscriptionGroupConfigCacheRefreshSeconds(int
subscriptionGroupConfigCacheRefreshSeconds) {
+ this.subscriptionGroupConfigCacheRefreshSeconds =
subscriptionGroupConfigCacheRefreshSeconds;
+ }
+
+ public int getSubscriptionGroupConfigCacheExpiredSeconds() {
+ return subscriptionGroupConfigCacheExpiredSeconds;
}
- public void setSubscriptionGroupConfigCacheExpiredInSeconds(int
subscriptionGroupConfigCacheExpiredInSeconds) {
- this.subscriptionGroupConfigCacheExpiredInSeconds =
subscriptionGroupConfigCacheExpiredInSeconds;
+ public void setSubscriptionGroupConfigCacheExpiredSeconds(int
subscriptionGroupConfigCacheExpiredSeconds) {
+ this.subscriptionGroupConfigCacheExpiredSeconds =
subscriptionGroupConfigCacheExpiredSeconds;
}
public int getSubscriptionGroupConfigCacheMaxNum() {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
index bc9582ad81..d34a0efd9e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
@@ -69,11 +69,13 @@ public class ClusterMetadataService extends
AbstractStartAndShutdown implements
);
this.topicConfigCache = CacheBuilder.newBuilder()
.maximumSize(config.getTopicConfigCacheMaxNum())
- .refreshAfterWrite(config.getTopicConfigCacheExpiredInSeconds(),
TimeUnit.SECONDS)
+ .expireAfterAccess(config.getTopicConfigCacheExpiredSeconds(),
TimeUnit.SECONDS)
+ .refreshAfterWrite(config.getTopicConfigCacheRefreshSeconds(),
TimeUnit.SECONDS)
.build(new ClusterTopicConfigCacheLoader());
this.subscriptionGroupConfigCache = CacheBuilder.newBuilder()
.maximumSize(config.getSubscriptionGroupConfigCacheMaxNum())
-
.refreshAfterWrite(config.getSubscriptionGroupConfigCacheExpiredInSeconds(),
TimeUnit.SECONDS)
+
.expireAfterAccess(config.getSubscriptionGroupConfigCacheExpiredSeconds(),
TimeUnit.SECONDS)
+
.refreshAfterWrite(config.getSubscriptionGroupConfigCacheRefreshSeconds(),
TimeUnit.SECONDS)
.build(new ClusterSubscriptionGroupConfigCacheLoader());
this.init();
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index e012a5465a..84348adc32 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -68,10 +68,13 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
);
this.mqClientAPIFactory = mqClientAPIFactory;
- this.topicCache =
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
-
refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(),
TimeUnit.SECONDS).
- executor(cacheRefreshExecutor).build(new CacheLoader<String,
MessageQueueView>() {
- @Override public @Nullable MessageQueueView load(String topic)
throws Exception {
+ this.topicCache =
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum())
+
.expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(),
TimeUnit.SECONDS)
+
.refreshAfterWrite(config.getTopicRouteServiceCacheRefreshSeconds(),
TimeUnit.SECONDS)
+ .executor(cacheRefreshExecutor)
+ .build(new CacheLoader<String, MessageQueueView>() {
+ @Override
+ public @Nullable MessageQueueView load(String topic) throws
Exception {
try {
TopicRouteData topicRouteData =
mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic,
Duration.ofSeconds(3).toMillis());
return buildMessageQueueView(topic, topicRouteData);
@@ -83,7 +86,8 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
}
}
- @Override public @Nullable MessageQueueView reload(@NonNull
String key,
+ @Override
+ public @Nullable MessageQueueView reload(@NonNull String key,
@NonNull MessageQueueView oldValue) throws Exception {
try {
return load(key);