This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e5a71bb95 using caffeine replace guava cache (#4669)
e5a71bb95 is described below
commit e5a71bb95f6b8c1dcb4e44d5948469629da3833b
Author: fuyou001 <[email protected]>
AuthorDate: Thu Aug 11 20:16:34 2022 +0800
using caffeine replace guava cache (#4669)
* [ISSUE #4668] using caffeine replace guava cache
* [ISSUE #4668] fix cache miss
* [ISSUE #4725] fix enforce-ban-circular-dependencies error
---
pom.xml | 6 ++++
proxy/pom.xml | 10 +++++++
.../proxy/service/route/TopicRouteService.java | 33 ++++++++++++++++++----
3 files changed, 43 insertions(+), 6 deletions(-)
diff --git a/pom.xml b/pom.xml
index cf5ceae72..73bc7ab69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,6 +129,7 @@
<protobuf.version>3.20.1</protobuf.version>
<disruptor.version>1.2.10</disruptor.version>
<org.relection.version>0.9.11</org.relection.version>
+ <caffeine.version>2.9.3</caffeine.version>
<!-- Test dependencies -->
<junit.version>4.13.2</junit.version>
@@ -835,6 +836,11 @@
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${caffeine.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/proxy/pom.xml b/proxy/pom.xml
index 4511bc7fa..a72f30c82 100644
--- a/proxy/pom.xml
+++ b/proxy/pom.xml
@@ -84,6 +84,16 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.checkerframework</groupId>
+ <artifactId>checker-qual</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index d8909a3f1..b1e4517fe 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -16,8 +16,9 @@
*/
package org.apache.rocketmq.proxy.service.route;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
@@ -39,6 +40,7 @@ import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.checkerframework.checker.nullness.qual.Nullable;
public abstract class TopicRouteService extends AbstractStartAndShutdown {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -48,6 +50,8 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
protected final LoadingCache<String /* topicName */, MessageQueueView>
topicCache;
protected final ScheduledExecutorService scheduledExecutorService;
protected final ThreadPoolExecutor cacheRefreshExecutor;
+ private final TopicRouteCacheLoader topicRouteCacheLoader = new
TopicRouteCacheLoader();
+
public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) {
ProxyConfig config = ConfigurationManager.getProxyConfig();
@@ -64,10 +68,27 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
config.getTopicRouteServiceThreadPoolQueueCapacity()
);
this.mqClientAPIFactory = mqClientAPIFactory;
- this.topicCache = CacheBuilder.newBuilder()
- .maximumSize(config.getTopicRouteServiceCacheMaxNum())
-
.refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(),
TimeUnit.SECONDS)
- .build(new TopicRouteCacheLoader());
+
+ this.topicCache =
Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()).
+
refreshAfterWrite(config.getTopicRouteServiceCacheExpiredInSeconds(),
TimeUnit.SECONDS).
+ executor(cacheRefreshExecutor).build(new CacheLoader<String,
MessageQueueView>() {
+ @Override public @Nullable MessageQueueView load(String topic)
throws Exception {
+ try {
+ TopicRouteData topicRouteData =
topicRouteCacheLoader.loadTopicRouteData(topic);
+ if (isTopicRouteValid(topicRouteData)) {
+ MessageQueueView tmp = new MessageQueueView(topic,
topicRouteData);
+ log.info("load topic route from namesrv. topic:
{}, queue: {}", topic, tmp);
+ return tmp;
+ }
+ return MessageQueueView.WRAPPED_EMPTY_QUEUE;
+ } catch (Exception e) {
+ if (TopicRouteHelper.isTopicNotExistError(e)) {
+ return MessageQueueView.WRAPPED_EMPTY_QUEUE;
+ }
+ throw e;
+ }
+ }
+ });
this.init();
}