This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f7878d1b7 [ISSUE #5305] fix proxy TopicRouteService cache bug (#5306)
f7878d1b7 is described below
commit f7878d1b7cdf7ffe10339078a7dc40c1beb67642
Author: fuyou001 <[email protected]>
AuthorDate: Fri Oct 14 11:27:08 2022 +0800
[ISSUE #5305] fix proxy TopicRouteService cache bug (#5306)
* [ISSUE #5305] fix proxy TopicRouteService cache bug
* [ISSUE #5305] add log
* [ISSUE #5305] add cache unit test
---
.../proxy/service/route/TopicRouteService.java | 11 +++++
.../route/ClusterTopicRouteServiceTest.java | 49 ++++++++++++++++++++++
2 files changed, 60 insertions(+)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index b1e4517fe..85e5e39bd 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.mqclient.MQClientAPIFactory;
+import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
public abstract class TopicRouteService extends AbstractStartAndShutdown {
@@ -88,6 +89,16 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
throw e;
}
}
+
+ @Override public @Nullable MessageQueueView reload(@NonNull
String key,
+ @NonNull MessageQueueView oldValue) throws Exception {
+ try {
+ return load(key);
+ } catch (Exception e) {
+ log.warn(String.format("reload topic route from
namesrv. topic: %s", key), e);
+ return oldValue;
+ }
+ }
});
this.init();
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
index 2a5d3189e..b0baa3e9b 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/ClusterTopicRouteServiceTest.java
@@ -17,14 +17,24 @@
package org.apache.rocketmq.proxy.service.route;
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
import com.google.common.net.HostAndPort;
import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.service.BaseServiceTest;
+import static org.assertj.core.api.Assertions.assertThat;
import org.assertj.core.util.Lists;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Before;
import org.junit.Test;
@@ -67,4 +77,43 @@ public class ClusterTopicRouteServiceTest extends
BaseServiceTest {
assertEquals(1, proxyTopicRouteData.getBrokerDatas().size());
assertEquals(addressList,
proxyTopicRouteData.getBrokerDatas().get(0).getBrokerAddrs().get(MixAll.MASTER_ID));
}
+
+ @Test
+ public void testTopicRouteCaffeineCache() throws InterruptedException {
+ String key = "abc";
+ String value = key;
+ final AtomicBoolean throwException = new AtomicBoolean();
+ ThreadPoolExecutor cacheRefreshExecutor =
ThreadPoolMonitor.createAndMonitor(
+ 10, 10, 30L, TimeUnit.SECONDS, "test", 10);
+ LoadingCache<String /* topicName */, String> topicCache =
Caffeine.newBuilder().maximumSize(30).
+ refreshAfterWrite(2,
TimeUnit.SECONDS).executor(cacheRefreshExecutor).build(new CacheLoader<String,
String>() {
+ @Override public @Nullable String load(@NonNull String key)
throws Exception {
+ try {
+ if (throwException.get()) {
+ throw new RuntimeException();
+ } else {
+ throwException.set(true);
+ return value;
+ }
+ } catch (Exception e) {
+ if (TopicRouteHelper.isTopicNotExistError(e)) {
+ return "";
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public @Nullable String reload(@NonNull String key, @NonNull
String oldValue) throws Exception {
+ try {
+ return load(key);
+ } catch (Exception e) {
+ return oldValue;
+ }
+ }
+ });
+ assertThat(value).isEqualTo(topicCache.get(key));
+ TimeUnit.SECONDS.sleep(5);
+ assertThat(value).isEqualTo(topicCache.get(key));
+ }
}
\ No newline at end of file