This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch refactor
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/refactor by this push:
new de152dd pref: optimize the response speed of the query api (#273)
de152dd is described below
commit de152dd6f3ae81136154a0fb1547cb83ce8be4f9
Author: Xu Yichi <[email protected]>
AuthorDate: Thu Mar 27 12:22:50 2025 +0800
pref: optimize the response speed of the query api (#273)
---
.../dashboard/service/impl/MessageServiceImpl.java | 19 ++-
.../dashboard/service/impl/TopicServiceImpl.java | 40 +++++--
.../support/AutoCloseConsumerWrapper.java | 132 +++++++++++++++++++++
.../util/AutoCloseConsumerWrapperTests.java | 84 +++++++++++++
4 files changed, 256 insertions(+), 19 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
index 16d0d4e..0586447 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
@@ -127,11 +128,11 @@ public class MessageServiceImpl implements MessageService
{
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
- DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook,
configure.isUseTLS());
+ AutoCloseConsumerWrapper consumerWrapper = new
AutoCloseConsumerWrapper();
+ DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook,
configure.isUseTLS());
List<MessageView> messageViewList = Lists.newArrayList();
try {
String subExpression = "*";
- consumer.start();
Set<MessageQueue> mqs =
consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) {
long minOffset = consumer.searchOffset(mq, begin);
@@ -188,8 +189,6 @@ public class MessageServiceImpl implements MessageService {
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
- } finally {
- consumer.shutdown();
}
}
@@ -263,7 +262,8 @@ public class MessageServiceImpl implements MessageService {
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
- DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook,
configure.isUseTLS());
+ AutoCloseConsumerWrapper consumerWrapper = new
AutoCloseConsumerWrapper();
+ DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook,
configure.isUseTLS());
long total = 0;
List<QueueOffsetInfo> queueOffsetInfos = new ArrayList<>();
@@ -271,7 +271,6 @@ public class MessageServiceImpl implements MessageService {
List<MessageView> messageViews = new ArrayList<>();
try {
- consumer.start();
Collection<MessageQueue> messageQueues =
consumer.fetchSubscribeMessageQueues(query.getTopic());
int idx = 0;
for (MessageQueue messageQueue : messageQueues) {
@@ -394,8 +393,6 @@ public class MessageServiceImpl implements MessageService {
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
- } finally {
- consumer.shutdown();
}
}
@@ -405,14 +402,14 @@ public class MessageServiceImpl implements MessageService
{
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
- DefaultMQPullConsumer consumer = buildDefaultMQPullConsumer(rpcHook,
configure.isUseTLS());
+ AutoCloseConsumerWrapper consumerWrapper = new
AutoCloseConsumerWrapper();
+ DefaultMQPullConsumer consumer = consumerWrapper.getConsumer(rpcHook,
configure.isUseTLS());
List<MessageView> messageViews = new ArrayList<>();
long offset = query.getPageNum() * query.getPageSize();
long total = 0;
try {
- consumer.start();
for (QueueOffsetInfo queueOffsetInfo : queueOffsetInfos) {
long start = queueOffsetInfo.getStart();
long end = queueOffsetInfo.getEnd();
@@ -462,8 +459,6 @@ public class MessageServiceImpl implements MessageService {
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
- } finally {
- consumer.shutdown();
}
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
index 4f34fc6..3f4bf9a 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/impl/TopicServiceImpl.java
@@ -73,6 +73,10 @@ import static
org.apache.rocketmq.common.TopicAttributes.TOPIC_MESSAGE_TYPE_ATTR
@Service
public class TopicServiceImpl extends AbstractCommonService implements
TopicService {
+ private transient DefaultMQProducer systemTopicProducer;
+
+ private final Object producerLock = new Object();
+
@Autowired
private RMQConfigure configure;
@@ -297,18 +301,40 @@ public class TopicServiceImpl extends
AbstractCommonService implements TopicServ
if (isEnableAcl) {
rpcHook = new AclClientRPCHook(new
SessionCredentials(configure.getAccessKey(), configure.getSecretKey()));
}
- DefaultMQProducer producer =
buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
- producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
- producer.setNamesrvAddr(configure.getNamesrvAddr());
+
+ // ensures thread safety
+ if (systemTopicProducer == null) {
+ synchronized (producerLock) {
+ if (systemTopicProducer == null) {
+ systemTopicProducer =
buildDefaultMQProducer(MixAll.SELF_TEST_PRODUCER_GROUP, rpcHook);
+ systemTopicProducer.setInstanceName("SystemTopicProducer-"
+ System.currentTimeMillis());
+
systemTopicProducer.setNamesrvAddr(configure.getNamesrvAddr());
+ try {
+ systemTopicProducer.start();
+ } catch (Exception e) {
+ systemTopicProducer = null;
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
try {
- producer.start();
- return
producer.getDefaultMQProducerImpl().getmQClientFactory().getMQClientAPIImpl().getSystemTopicList(20000L);
+ return systemTopicProducer.getDefaultMQProducerImpl()
+ .getmQClientFactory()
+ .getMQClientAPIImpl()
+ .getSystemTopicList(20000L);
} catch (Exception e) {
+ // If the call fails, close and clean up the producer, and it will
be re-created next time.
+ synchronized (producerLock) {
+ if (systemTopicProducer != null) {
+ systemTopicProducer.shutdown();
+ systemTopicProducer = null;
+ }
+ }
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
- } finally {
- producer.shutdown();
}
}
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java
b/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java
new file mode 100644
index 0000000..6f2b52e
--- /dev/null
+++
b/src/main/java/org/apache/rocketmq/dashboard/support/AutoCloseConsumerWrapper.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.dashboard.support;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class AutoCloseConsumerWrapper {
+
+ private final Logger logger =
LoggerFactory.getLogger(GlobalRestfulResponseBodyAdvice.class);
+
+ private static final AtomicReference<DefaultMQPullConsumer> CONSUMER_REF =
new AtomicReference<>();
+ private final AtomicBoolean isTaskScheduled = new AtomicBoolean(false);
+ private final AtomicBoolean isClosing = new AtomicBoolean(false);
+ private static volatile Instant lastUsedTime = Instant.now();
+
+
+ private static final ScheduledExecutorService SCHEDULER =
+ Executors.newSingleThreadScheduledExecutor();
+
+ public AutoCloseConsumerWrapper() {
+ startIdleCheckTask();
+ }
+
+
+ public DefaultMQPullConsumer getConsumer(RPCHook rpcHook,Boolean useTLS) {
+ lastUsedTime = Instant.now();
+
+ DefaultMQPullConsumer consumer = CONSUMER_REF.get();
+ if (consumer == null) {
+ synchronized (this) {
+ consumer = CONSUMER_REF.get();
+ if (consumer == null) {
+ consumer = createNewConsumer(rpcHook,useTLS);
+ CONSUMER_REF.set(consumer);
+ }
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ consumer.shutdown();
+ CONSUMER_REF.set(null);
+ throw new RuntimeException("Failed to start consumer", e);
+
+ }
+ }
+ }
+ return consumer;
+ }
+
+
+ protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook, Boolean
useTLS) {
+ return new DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook)
{{
+ setUseTLS(useTLS);
+ }};
+ }
+
+ private void startIdleCheckTask() {
+ if (!isTaskScheduled.get()) {
+ synchronized (this) {
+ if (!isTaskScheduled.get()) {
+ SCHEDULER.scheduleWithFixedDelay(() -> {
+ try {
+ checkAndCloseIdleConsumer();
+ } catch (Exception e) {
+ logger.error("Idle check failed", e);
+ }
+ }, 1, 1, TimeUnit.MINUTES);
+
+ isTaskScheduled.set(true);
+ }
+ }
+ }
+ }
+
+ public void checkAndCloseIdleConsumer() {
+ if (shouldClose()) {
+ synchronized (this) {
+ if (shouldClose()) {
+ close();
+ }
+ }
+ }
+ }
+
+ private boolean shouldClose() {
+ long idleTimeoutMs = 60_000;
+ return CONSUMER_REF.get() != null &&
+ Duration.between(lastUsedTime, Instant.now()).toMillis() >
idleTimeoutMs;
+ }
+
+
+ public void close() {
+ if (isClosing.compareAndSet(false, true)) {
+ try {
+ DefaultMQPullConsumer consumer = CONSUMER_REF.getAndSet(null);
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+ isTaskScheduled.set(false);
+ } finally {
+ isClosing.set(false);
+ }
+ }
+ }
+
+}
diff --git
a/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java
b/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java
new file mode 100644
index 0000000..ddd1533
--- /dev/null
+++
b/src/test/java/org/apache/rocketmq/dashboard/util/AutoCloseConsumerWrapperTests.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.dashboard.util;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.dashboard.support.AutoCloseConsumerWrapper;
+import org.apache.rocketmq.remoting.RPCHook;
+import java.lang.reflect.Field;
+import static org.mockito.Mockito.mock;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import java.time.Instant;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class AutoCloseConsumerWrapperTests {
+
+ private static class TestableWrapper extends AutoCloseConsumerWrapper {
+ private DefaultMQPullConsumer mockConsumer =
mock(DefaultMQPullConsumer.class);
+
+ @Override
+ protected DefaultMQPullConsumer createNewConsumer(RPCHook rpcHook,
Boolean useTLS) {
+ return mockConsumer;
+ }
+ }
+
+ @Test
+ void shouldReuseConsumerInstance() throws Exception {
+ TestableWrapper wrapper = new TestableWrapper();
+
+ DefaultMQPullConsumer first = wrapper.getConsumer(mock(RPCHook.class),
true);
+ assertNotNull(first);
+
+ DefaultMQPullConsumer second =
wrapper.getConsumer(mock(RPCHook.class), true);
+ assertSame(first, second);
+ }
+
+ @Test
+ void shouldHandleStartFailure() throws Exception {
+ TestableWrapper wrapper = new TestableWrapper();
+ doThrow(new MQClientException("Simulated error", null))
+ .when(wrapper.mockConsumer).start();
+
+ assertThrows(RuntimeException.class, () ->
+ wrapper.getConsumer(mock(RPCHook.class), true));
+
+ verify(wrapper.mockConsumer).shutdown();
+ }
+
+
+
+ @Test
+ void shouldCloseIdleConsumer() throws Exception {
+ TestableWrapper wrapper = new TestableWrapper();
+
+ wrapper.getConsumer(mock(RPCHook.class), true);
+
+ Field lastUsedTime =
AutoCloseConsumerWrapper.class.getDeclaredField("lastUsedTime");
+ lastUsedTime.setAccessible(true);
+ lastUsedTime.set(wrapper, Instant.now().minusSeconds(70));
+
+ wrapper.checkAndCloseIdleConsumer();
+
+ verify(wrapper.mockConsumer).shutdown();
+ }
+}