This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7e018520ef Add expireAfterAccess for cache (#7247)
7e018520ef is described below

commit 7e018520ef707a841c66c55d621f6560d03b631b
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Aug 25 09:49:22 2023 +0800

    Add expireAfterAccess for cache (#7247)
    
    Add expireAfterAccess for cache
---
 .../apache/rocketmq/proxy/config/ProxyConfig.java  | 59 ++++++++++++++++------
 .../service/metadata/ClusterMetadataService.java   |  6 ++-
 .../proxy/service/route/TopicRouteService.java     | 14 +++--
 3 files changed, 56 insertions(+), 23 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 76a2439196..2994893d71 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -155,14 +155,17 @@ public class ProxyConfig implements ConfigFile {
     private int consumerProcessorThreadPoolQueueCapacity = 10000;
 
     private boolean useEndpointPortFromRequest = false;
-    private int topicRouteServiceCacheExpiredInSeconds = 20;
+
+    private int topicRouteServiceCacheExpiredSeconds = 300;
+    private int topicRouteServiceCacheRefreshSeconds = 20;
     private int topicRouteServiceCacheMaxNum = 20000;
     private int topicRouteServiceThreadPoolNums = PROCESSOR_NUMBER;
     private int topicRouteServiceThreadPoolQueueCapacity = 5000;
-
-    private int topicConfigCacheExpiredInSeconds = 20;
+    private int topicConfigCacheExpiredSeconds = 300;
+    private int topicConfigCacheRefreshSeconds = 20;
     private int topicConfigCacheMaxNum = 20000;
-    private int subscriptionGroupConfigCacheExpiredInSeconds = 20;
+    private int subscriptionGroupConfigCacheExpiredSeconds = 300;
+    private int subscriptionGroupConfigCacheRefreshSeconds = 20;
     private int subscriptionGroupConfigCacheMaxNum = 20000;
     private int metadataThreadPoolNums = 3;
     private int metadataThreadPoolQueueCapacity = 100000;
@@ -794,12 +797,20 @@ public class ProxyConfig implements ConfigFile {
         this.consumerProcessorThreadPoolQueueCapacity = 
consumerProcessorThreadPoolQueueCapacity;
     }
 
-    public int getTopicRouteServiceCacheExpiredInSeconds() {
-        return topicRouteServiceCacheExpiredInSeconds;
+    public int getTopicRouteServiceCacheExpiredSeconds() {
+        return topicRouteServiceCacheExpiredSeconds;
+    }
+
+    public void setTopicRouteServiceCacheExpiredSeconds(int 
topicRouteServiceCacheExpiredSeconds) {
+        this.topicRouteServiceCacheExpiredSeconds = 
topicRouteServiceCacheExpiredSeconds;
     }
 
-    public void setTopicRouteServiceCacheExpiredInSeconds(int 
topicRouteServiceCacheExpiredInSeconds) {
-        this.topicRouteServiceCacheExpiredInSeconds = 
topicRouteServiceCacheExpiredInSeconds;
+    public int getTopicRouteServiceCacheRefreshSeconds() {
+        return topicRouteServiceCacheRefreshSeconds;
+    }
+
+    public void setTopicRouteServiceCacheRefreshSeconds(int 
topicRouteServiceCacheRefreshSeconds) {
+        this.topicRouteServiceCacheRefreshSeconds = 
topicRouteServiceCacheRefreshSeconds;
     }
 
     public int getTopicRouteServiceCacheMaxNum() {
@@ -826,12 +837,20 @@ public class ProxyConfig implements ConfigFile {
         this.topicRouteServiceThreadPoolQueueCapacity = 
topicRouteServiceThreadPoolQueueCapacity;
     }
 
-    public int getTopicConfigCacheExpiredInSeconds() {
-        return topicConfigCacheExpiredInSeconds;
+    public int getTopicConfigCacheRefreshSeconds() {
+        return topicConfigCacheRefreshSeconds;
+    }
+
+    public void setTopicConfigCacheRefreshSeconds(int 
topicConfigCacheRefreshSeconds) {
+        this.topicConfigCacheRefreshSeconds = topicConfigCacheRefreshSeconds;
+    }
+
+    public int getTopicConfigCacheExpiredSeconds() {
+        return topicConfigCacheExpiredSeconds;
     }
 
-    public void setTopicConfigCacheExpiredInSeconds(int 
topicConfigCacheExpiredInSeconds) {
-        this.topicConfigCacheExpiredInSeconds = 
topicConfigCacheExpiredInSeconds;
+    public void setTopicConfigCacheExpiredSeconds(int 
topicConfigCacheExpiredSeconds) {
+        this.topicConfigCacheExpiredSeconds = topicConfigCacheExpiredSeconds;
     }
 
     public int getTopicConfigCacheMaxNum() {
@@ -842,12 +861,20 @@ public class ProxyConfig implements ConfigFile {
         this.topicConfigCacheMaxNum = topicConfigCacheMaxNum;
     }
 
-    public int getSubscriptionGroupConfigCacheExpiredInSeconds() {
-        return subscriptionGroupConfigCacheExpiredInSeconds;
+    public int getSubscriptionGroupConfigCacheRefreshSeconds() {
+        return subscriptionGroupConfigCacheRefreshSeconds;
+    }
+
+    public void setSubscriptionGroupConfigCacheRefreshSeconds(int 
subscriptionGroupConfigCacheRefreshSeconds) {
+        this.subscriptionGroupConfigCacheRefreshSeconds = 
subscriptionGroupConfigCacheRefreshSeconds;
+    }
+
+    public int getSubscriptionGroupConfigCacheExpiredSeconds() {
+        return subscriptionGroupConfigCacheExpiredSeconds;
     }
 
-    public void setSubscriptionGroupConfigCacheExpiredInSeconds(int 
subscriptionGroupConfigCacheExpiredInSeconds) {
-        this.subscriptionGroupConfigCacheExpiredInSeconds = 
subscriptionGroupConfigCacheExpiredInSeconds;
+    public void setSubscriptionGroupConfigCacheExpiredSeconds(int 
subscriptionGroupConfigCacheExpiredSeconds) {
+        this.subscriptionGroupConfigCacheExpiredSeconds = 
subscriptionGroupConfigCacheExpiredSeconds;
     }
 
     public int getSubscriptionGroupConfigCacheMaxNum() {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
index bc9582ad81..d34a0efd9e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/metadata/ClusterMetadataService.java
@@ -69,11 +69,13 @@ public class ClusterMetadataService extends 
AbstractStartAndShutdown implements
         );
         this.topicConfigCache = CacheBuilder.newBuilder()
             .maximumSize(config.getTopicConfigCacheMaxNum())
-            .refreshAfterWrite(config.getTopicConfigCacheExpiredInSeconds(), 
TimeUnit.SECONDS)
+            .expireAfterAccess(config.getTopicConfigCacheExpiredSeconds(), 
TimeUnit.SECONDS)
+            .refreshAfterWrite(config.getTopicConfigCacheRefreshSeconds(), 
TimeUnit.SECONDS)
             .build(new ClusterTopicConfigCacheLoader());
         this.subscriptionGroupConfigCache = CacheBuilder.newBuilder()
             .maximumSize(config.getSubscriptionGroupConfigCacheMaxNum())
-            
.refreshAfterWrite(config.getSubscriptionGroupConfigCacheExpiredInSeconds(), 
TimeUnit.SECONDS)
+            
.expireAfterAccess(config.getSubscriptionGroupConfigCacheExpiredSeconds(), 
TimeUnit.SECONDS)
+            
.refreshAfterWrite(config.getSubscriptionGroupConfigCacheRefreshSeconds(), 
TimeUnit.SECONDS)
             .build(new ClusterSubscriptionGroupConfigCacheLoader());
 
         this.init();
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index e012a5465a..84348adc32 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -68,10 +68,13 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
         );
         this.mqClientAPIFactory = mqClientAPIFactory;
 
-        this.topicCache = 
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
-            
refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), 
TimeUnit.SECONDS).
-            executor(cacheRefreshExecutor).build(new CacheLoader<String, 
MessageQueueView>() {
-                @Override public @Nullable MessageQueueView load(String topic) 
throws Exception {
+        this.topicCache = 
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum())
+            
.expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), 
TimeUnit.SECONDS)
+            
.refreshAfterWrite(config.getTopicRouteServiceCacheRefreshSeconds(), 
TimeUnit.SECONDS)
+            .executor(cacheRefreshExecutor)
+            .build(new CacheLoader<String, MessageQueueView>() {
+                @Override
+                public @Nullable MessageQueueView load(String topic) throws 
Exception {
                     try {
                         TopicRouteData topicRouteData = 
mqClientAPIFactory.getClient().getTopicRouteInfoFromNameServer(topic, 
Duration.ofSeconds(3).toMillis());
                         return buildMessageQueueView(topic, topicRouteData);
@@ -83,7 +86,8 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
                     }
                 }
 
-                @Override public @Nullable MessageQueueView reload(@NonNull 
String key,
+                @Override
+                public @Nullable MessageQueueView reload(@NonNull String key,
                     @NonNull MessageQueueView oldValue) throws Exception {
                     try {
                         return load(key);

Reply via email to