This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e5a71bb95 using caffeine replace guava cache (#4669)
e5a71bb95 is described below

commit e5a71bb95f6b8c1dcb4e44d5948469629da3833b
Author: fuyou001 <[email protected]>
AuthorDate: Thu Aug 11 20:16:34 2022 +0800

    using caffeine replace guava cache (#4669)
    
    * [ISSUE #4668] using caffeine replace guava cache
    
    * [ISSUE #4668] fix cache miss
    
    * [ISSUE #4725] fix enforce-ban-circular-dependencies  error
---
 pom.xml                                            |  6 ++++
 proxy/pom.xml                                      | 10 +++++++
 .../proxy/service/route/TopicRouteService.java     | 33 ++++++++++++++++++----
 3 files changed, 43 insertions(+), 6 deletions(-)

diff --git a/pom.xml b/pom.xml
index cf5ceae72..73bc7ab69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
         <protobuf.version>3.20.1</protobuf.version>
         <disruptor.version>1.2.10</disruptor.version>
         <org.relection.version>0.9.11</org.relection.version>
+        <caffeine.version>2.9.3</caffeine.version>
 
         <!-- Test dependencies -->
         <junit.version>4.13.2</junit.version>
@@ -835,6 +836,11 @@
                 <artifactId>disruptor</artifactId>
                 <version>${disruptor.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.ben-manes.caffeine</groupId>
+                <artifactId>caffeine</artifactId>
+                <version>${caffeine.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/proxy/pom.xml b/proxy/pom.xml
index 4511bc7fa..a72f30c82 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -84,6 +84,16 @@
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.checkerframework</groupId>
+                    <artifactId>checker-qual</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index d8909a3f1..b1e4517fe 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -16,8 +16,9 @@
  */
 package org.apache.rocketmq.proxy.service.route;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -39,6 +40,7 @@ import org.apache.rocketmq.proxy.common.Address;
 import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 public abstract class TopicRouteService extends AbstractStartAndShutdown {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -48,6 +50,8 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
     protected final LoadingCache<String /* topicName */, MessageQueueView> 
topicCache;
     protected final ScheduledExecutorService scheduledExecutorService;
     protected final ThreadPoolExecutor cacheRefreshExecutor;
+    private final TopicRouteCacheLoader topicRouteCacheLoader = new 
TopicRouteCacheLoader();
+
 
     public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
         ProxyConfig config = ConfigurationManager.getProxyConfig();
@@ -64,10 +68,27 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
             config.getTopicRouteServiceThreadPoolQueueCapacity()
         );
         this.mqClientAPIFactory = mqClientAPIFactory;
-        this.topicCache = CacheBuilder.newBuilder()
-            .maximumSize(config.getTopicRouteServiceCacheMaxNum())
-            
.refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), 
TimeUnit.SECONDS)
-            .build(new TopicRouteCacheLoader());
+
+        this.topicCache = 
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
+            
refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(), 
TimeUnit.SECONDS).
+            executor(cacheRefreshExecutor).build(new CacheLoader<String, 
MessageQueueView>() {
+                @Override public @Nullable MessageQueueView load(String topic) 
throws Exception {
+                    try {
+                        TopicRouteData topicRouteData = 
topicRouteCacheLoader.loadTopicRouteData(topic);
+                        if (isTopicRouteValid(topicRouteData)) {
+                            MessageQueueView tmp = new MessageQueueView(topic, 
topicRouteData);
+                            log.info("load topic route from namesrv. topic: 
{}, queue: {}", topic, tmp);
+                            return tmp;
+                        }
+                        return MessageQueueView.WRAPPED_EMPTY_QUEUE;
+                    } catch (Exception e) {
+                        if (TopicRouteHelper.isTopicNotExistError(e)) {
+                            return MessageQueueView.WRAPPED_EMPTY_QUEUE;
+                        }
+                        throw e;
+                    }
+                }
+            });
 
         this.init();
     }

Reply via email to