This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.18.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.18.x by this push:
new 3b0a646af AMQ-9452: unwrap BaseDestination to access queue/topic
message
3b0a646af is described below
commit 3b0a646af24cef2335f8a7c9a48332dc5bcf6bb5
Author: Grzegorz Kochanski <[email protected]>
AuthorDate: Thu Mar 14 09:32:40 2024 +0100
AMQ-9452: unwrap BaseDestination to access queue/topic message
Change-Id: Ic05002ecb428e2aa5abeb9dc3e499e3aae550051
(cherry picked from commit 4d40023968fc334982f15e9c9623b1d74d308931)
---
.../org/apache/activemq/plugin/StatisticsBroker.java | 5 +++++
.../activemq/plugin/BrokerStatisticsPluginTest.java | 19 +++++++++++++++++--
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
index 92bb14cbd..15230ad64 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
@@ -34,6 +34,7 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
@@ -141,6 +142,10 @@ public class StatisticsBroker extends BrokerFilter {
statsMessage.setLong("consumerCount",
stats.getConsumers().getCount());
statsMessage.setLong("producerCount",
stats.getProducers().getCount());
if (includeFirstMessageTimestamp) {
+ //AMQ-9452: unwrap BaseDestination
+ while (dest instanceof DestinationFilter) {
+ dest = ((DestinationFilter) dest).getNext();
+ }
if (dest instanceof Queue) {
((Queue) dest).doBrowse(tempFirstMessage, 1);
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
index 9a8a5d263..40f59007a 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
@@ -17,6 +17,8 @@
package org.apache.activemq.plugin;
import java.net.URI;
+import java.util.Set;
+import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
@@ -26,12 +28,17 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,6 +217,7 @@ public class BrokerStatisticsPluginTest extends TestCase{
assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0);
+ assertTrue(reply.getLong("firstMessageTimestamp") > 0);
// Assert that we got the brokerInTime for the first message in queue
as value of key "firstMessageTimestamp"
assertTrue(System.currentTimeMillis() >=
reply.getLong("firstMessageTimestamp"));
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
@@ -269,8 +277,15 @@ public class BrokerStatisticsPluginTest extends TestCase{
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
- BrokerPlugin[] plugins = new BrokerPlugin[1];
- plugins[0] = new StatisticsBrokerPlugin();
+ BrokerPlugin[] plugins = new BrokerPlugin[2];
+ //AMQ-9452: proxy destinations with DestinationFilter
+ plugins[0] = new BrokerPluginSupport() {
+ @Override
+ public Set<Destination> getDestinations(ActiveMQDestination
destination) {
+ return
super.getDestinations(destination).stream().map(DestinationFilter::new).collect(Collectors.toSet());
+ }
+ };
+ plugins[1] = new StatisticsBrokerPlugin();
answer.setPlugins(plugins);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector("tcp://localhost:0");