This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 9167a79  StatisticsBrokerPlugin: Add feat: request destination 
firstMessageTimestamp
     new bd7f391  Merge pull request #749 from stolsvik/main
9167a79 is described below

commit 9167a79b79e4c121cfe0a5b82456f52bf3ecc3c7
Author: Endre Stølsvik <[email protected]>
AuthorDate: Mon Jan 24 23:46:36 2022 +0100

    StatisticsBrokerPlugin: Add feat: request destination firstMessageTimestamp
    
    Adding a feature (STATS_FIRST_MESSAGE_TIMESTAMP) to the
    StatisticsBrokerPlugin's destination-statistics for getting the
    timestamp of the first message in the destination(s) being requested: If
    you on the query-message set the property
    StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP to anything (e.g. boolean
    true), a long value "firstMessageTimestamp" will be added to the
    statistics reply message(s). Since the reply message has JMSTimestamp
    set, which is the broker's now-timestamp, you may also on the query side
    calculate the age of the first message in milliseconds. The key name was
    chosen since that is the name of the corresponding feature in Artemis.
    
    This extension of the existing feature is implemented to be as
    non-intrusive as possible, adding very little runtime cost if not
    requested. It also seems like the runtime cost for enabling this
    feature, thus finding and adding the firstMessageTimestamp, is small.
    
    While at it, also slightly improving an existing feature
    (STATS_DENOTE_END_LIST) where a reply to a destination query can be
    "null terminated": After sending the relevant replies, the
    StatisticsBroker also sends an empty message. This feature is relevant
    if the query is a wildcard query, thus returning multiple messages: The
    empty message denotes the end of the replies. However, to activate this
    feature, a somewhat complicated query destination had to be constructed.
    Adopting the solution for the other StatisticsBroker feature where you
    may reset the broker statistics by adding a property to the query
    message, this null-termination feature now /also/ checks for the
    presence of this query modifier STATS_DENOTE_END_LIST as a property.
    (This property based solution was thus also adopted for the present
    'firstMessageTimestamp' solution, as it was found much more intuitive).
    
    Added tests for both the STATS_FIRST_MESSAGE_TIMESTAMP query modifier,
    and the improved STATS_DENOTE_END_LIST property-based query modifier.
    
    Had to make the Topic.doBrowse(List browseList, int max) public - the
    corresponding method for Queue was already public.
    
    Made the evaluation of whether this is a StatisticsBroker-relevant
    message a microscopic bit more performant (exiting faster if not
    relevant): To the initial test of whether the message is relevant, which
    only checked for replyTo being set, a check for 'destination.
    startsWith("ActiveMQ.Statistics")' was added. Only if so, the rest of
    the evaluations kick in. Also using 'string.startsWith(..)' instead of
    the verbose 'string.regionMatches(..)'.
    
    Removed an unused import on PartitionBrokerTest.java, as IntelliJ
    complained about not finding it.
---
 .../org/apache/activemq/broker/region/Topic.java   |  2 +-
 .../apache/activemq/plugin/StatisticsBroker.java   | 46 +++++++++++---
 .../activemq/partition/PartitionBrokerTest.java    |  1 -
 .../plugin/BrokerStatisticsPluginTest.java         | 71 +++++++++++++++++++++-
 4 files changed, 107 insertions(+), 13 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index c1078e3..834cd14 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -654,7 +654,7 @@ public class Topic extends BaseDestination implements Task {
         return result.toArray(new Message[result.size()]);
     }
 
-    private void doBrowse(final List<Message> browseList, final int max) {
+    public void doBrowse(final List<Message> browseList, final int max) {
         try {
             if (topicStore != null) {
                 final List<Message> toExpire = new ArrayList<Message>();
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
index 7476c3e..92bb14c 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
@@ -18,6 +18,8 @@ package org.apache.activemq.plugin;
 
 import java.io.File;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 import javax.jms.JMSException;
@@ -33,7 +35,9 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMapMessage;
 import org.apache.activemq.command.Message;
@@ -54,11 +58,17 @@ import org.slf4j.LoggerFactory;
  */
 public class StatisticsBroker extends BrokerFilter {
     private static Logger LOG = 
LoggerFactory.getLogger(StatisticsBroker.class);
+    static final String STATS_PREFIX = "ActiveMQ.Statistics";
+
     static final String STATS_DESTINATION_PREFIX = 
"ActiveMQ.Statistics.Destination";
     static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
     static final String STATS_BROKER_RESET_HEADER = 
"ActiveMQ.Statistics.Broker.Reset";
     static final String STATS_SUBSCRIPTION_PREFIX = 
"ActiveMQ.Statistics.Subscription";
-    static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + 
".List.End.With.Null";
+
+    // Query-message properties controlling features of Destination-query 
replies:
+    static final String STATS_DENOTE_END_LIST = 
"ActiveMQ.Statistics.Destination.List.End.With.Null";
+    static final String STATS_FIRST_MESSAGE_TIMESTAMP = 
"ActiveMQ.Statistics.Destination.Include.First.Message.Timestamp";
+
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
     private final LongSequenceGenerator messageIdGenerator = new 
LongSequenceGenerator();
     protected final ProducerId advisoryProducerId = new ProducerId();
@@ -85,26 +95,27 @@ public class StatisticsBroker extends BrokerFilter {
     public void send(ProducerBrokerExchange producerExchange, Message 
messageSend) throws Exception {
         ActiveMQDestination msgDest = messageSend.getDestination();
         ActiveMQDestination replyTo = messageSend.getReplyTo();
-        if (replyTo != null) {
+        if ((replyTo != null) && 
(msgDest.getPhysicalName().startsWith(STATS_PREFIX))) {
             String physicalName = msgDest.getPhysicalName();
-            boolean destStats = physicalName.regionMatches(true, 0, 
STATS_DESTINATION_PREFIX, 0,
-                    STATS_DESTINATION_PREFIX.length());
-            boolean brokerStats = physicalName.regionMatches(true, 0, 
STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
-                    .length());
-            boolean subStats = physicalName.regionMatches(true, 0, 
STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
-                    .length());
+            boolean destStats = 
physicalName.startsWith(STATS_DESTINATION_PREFIX);
+            boolean brokerStats = physicalName.startsWith(STATS_BROKER_PREFIX);
+            boolean subStats = 
physicalName.startsWith(STATS_SUBSCRIPTION_PREFIX);
             BrokerService brokerService = getBrokerService();
             RegionBroker regionBroker = (RegionBroker) 
brokerService.getRegionBroker();
             if (destStats) {
-                String destinationName = 
physicalName.substring(STATS_DESTINATION_PREFIX.length(), 
physicalName.length());
+                String destinationName = 
physicalName.substring(STATS_DESTINATION_PREFIX.length());
                 if (destinationName.startsWith(".")) {
                     destinationName = destinationName.substring(1);
                 }
                 String destinationQuery = 
destinationName.replace(STATS_DENOTE_END_LIST,"");
-                boolean endListMessage = 
!destinationName.equals(destinationQuery);
+                boolean endListMessage = 
!destinationName.equals(destinationQuery)
+                        || 
messageSend.getProperties().containsKey(STATS_DENOTE_END_LIST);
                 ActiveMQDestination queryDestination = 
ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType());
                 Set<Destination> destinations = 
getDestinations(queryDestination);
 
+                boolean includeFirstMessageTimestamp = 
messageSend.getProperties().containsKey(STATS_FIRST_MESSAGE_TIMESTAMP);
+                List<Message> tempFirstMessage = includeFirstMessageTimestamp 
? new ArrayList<>(1) : null;
+
                 for (Destination dest : destinations) {
                     DestinationStatistics stats = 
dest.getDestinationStatistics();
                     if (stats != null) {
@@ -129,6 +140,21 @@ public class StatisticsBroker extends BrokerFilter {
                         statsMessage.setDouble("minEnqueueTime", 
stats.getProcessTime().getMinTime());
                         statsMessage.setLong("consumerCount", 
stats.getConsumers().getCount());
                         statsMessage.setLong("producerCount", 
stats.getProducers().getCount());
+                        if (includeFirstMessageTimestamp) {
+                            if (dest instanceof Queue) {
+                                ((Queue) dest).doBrowse(tempFirstMessage, 1);
+                            }
+                            else if (dest instanceof Topic) {
+                                ((Topic) dest).doBrowse(tempFirstMessage, 1);
+                            }
+                            if (!tempFirstMessage.isEmpty()) {
+                                Message message = tempFirstMessage.get(0);
+                                // NOTICE: Client-side, you may get the broker 
"now" Timestamp by msg.getJMSTimestamp()
+                                // This allows for calculating age.
+                                statsMessage.setLong("firstMessageTimestamp", 
message.getBrokerInTime());
+                                tempFirstMessage.clear();
+                            }
+                        }
                         
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
                         sendStats(producerExchange.getConnectionContext(), 
statsMessage, replyTo);
                     }
diff --git 
a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
 
b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
index cc406d8..1b49f0b 100644
--- 
a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
+++ 
b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.partition;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
index b003a16..9a8a5d2 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.plugin;
 
 import java.net.URI;
-
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MapMessage;
@@ -153,6 +152,76 @@ public class BrokerStatisticsPluginTest extends TestCase{
         */
     }
 
+    public void testDestinationStatsWithNullTermination() throws Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue replyTo = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(replyTo);
+        Queue testQueue = session.createQueue("Test.Queue");
+        MessageProducer producer = session.createProducer(null);
+        Queue query = 
session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + 
testQueue.getQueueName());
+        Message msg = session.createMessage();
+        // Instruct to terminate query reply with a null-message
+        msg.setBooleanProperty(StatisticsBroker.STATS_DENOTE_END_LIST, true);
+
+        producer.send(testQueue, msg);
+
+        msg.setJMSReplyTo(replyTo);
+        producer.send(query, msg);
+        MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
+        assertNotNull(reply);
+        assertTrue(reply.getMapNames().hasMoreElements());
+        assertEquals(1, reply.getLong("size"));
+        assertTrue(reply.getJMSTimestamp() > 0);
+        assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
+
+        /*
+        for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) {
+            String name = e.nextElement().toString();
+            System.err.println(name+"="+reply.getObject(name));
+        }
+         */
+
+        // Assert that we got a null-termination
+        MapMessage nullReply = (MapMessage) consumer.receive(10 * 1000);
+        assertNotNull(nullReply);
+        // No props in null-message
+        assertFalse(nullReply.getMapNames().hasMoreElements());
+        assertTrue(nullReply.getJMSTimestamp() > 0);
+        assertEquals(Message.DEFAULT_PRIORITY, nullReply.getJMSPriority());
+    }
+
+    public void testDestinationStatsWithFirstMessageTimestamp() throws 
Exception {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Queue replyTo = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(replyTo);
+        Queue testQueue = session.createQueue("Test.Queue");
+        MessageProducer producer = session.createProducer(null);
+        Queue query = 
session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + 
testQueue.getQueueName());
+        Message msg = session.createMessage();
+        // Instruct to include timestamp of first message in the queue
+        msg.setBooleanProperty(StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP, 
true);
+
+        producer.send(testQueue, msg);
+
+        msg.setJMSReplyTo(replyTo);
+        producer.send(query, msg);
+        MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
+        assertNotNull(reply);
+        assertTrue(reply.getMapNames().hasMoreElements());
+        assertEquals(1, reply.getLong("size"));
+        assertTrue(reply.getJMSTimestamp() > 0);
+        // Assert that we got the brokerInTime for the first message in queue 
as value of key "firstMessageTimestamp"
+        assertTrue(System.currentTimeMillis() >= 
reply.getLong("firstMessageTimestamp"));
+        assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
+
+        /*
+        for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) {
+            String name = e.nextElement().toString();
+            System.err.println(name+"="+reply.getObject(name));
+        }
+         */
+    }
+
     @SuppressWarnings("unused")
     public void testSubscriptionStats() throws Exception{
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);

Reply via email to