This is an automated email from the ASF dual-hosted git repository.

mattrpav pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
     new 30e1af80df [AMQ-8463] New Feature: Advanced Message Flow statistics
30e1af80df is described below

commit 30e1af80dfa38acf5804061b9133159ad8ad92f5
Author: Matt Pavlovich <[email protected]>
AuthorDate: Fri Dec 13 12:04:20 2024 -0600

    [AMQ-8463] New Feature: Advanced Message Flow statistics
    
    Co-authored-by: Christopher L. Shannon <[email protected]>
    (cherry picked from commit 7f8cf0e7d977341b42f9b374a98292a586f95e79)
---
 .../activemq/broker/jmx/DestinationView.java       |  63 ++++++++++
 .../activemq/broker/jmx/DestinationViewMBean.java  |  33 ++++++
 .../activemq/broker/region/BaseDestination.java    |  14 ++-
 .../apache/activemq/broker/region/Destination.java |   5 +-
 .../activemq/broker/region/DestinationFilter.java  |  10 ++
 .../broker/region/DestinationStatistics.java       | 104 ++++++++--------
 .../broker/region/DurableTopicSubscription.java    |   6 +
 .../org/apache/activemq/broker/region/Queue.java   |  18 ++-
 .../org/apache/activemq/broker/region/Topic.java   |   6 +
 .../activemq/broker/region/TopicSubscription.java  |   6 +
 .../activemq/broker/region/policy/PolicyEntry.java |  13 ++
 .../store/MessageStoreSubscriptionStatistics.java  |   2 +-
 .../management/JCAConnectionPoolStatsImpl.java     |   6 +-
 .../management/JCAConnectionStatsImpl.java         |   4 +-
 .../activemq/management/JMSEndpointStatsImpl.java  |  10 +-
 .../activemq/management/JMSSessionStatsImpl.java   |   8 +-
 .../activemq/management/MessageFlowStats.java      |  32 +++++
 .../activemq/management/MessageFlowStatsImpl.java  | 118 +++++++++++++++++++
 .../apache/activemq/management/StatisticImpl.java  |  33 +++---
 .../org/apache/activemq/management/StatsImpl.java  |  60 ++++------
 .../activemq/management/UnsampledStatistic.java    |  27 +++++
 .../management/UnsampledStatisticImpl.java         |  70 +++++++++++
 .../activemq/management/UnsampledStatsImpl.java    |  41 +++++++
 .../management/UnsampledStatisticsTest.java        |  72 +++++++++++
 .../java/org/apache/activemq/PolicyEntryTest.java  |  15 +++
 ...ryTest-policy-advancedMessageStatistics-mod.xml |  36 ++++++
 ...yEntryTest-policy-advancedMessageStatistics.xml |  36 ++++++
 .../network/NetworkAdvancedStatisticsTest.java     | 131 ++++++++++++++++++---
 .../localBroker-advancedNetworkStatistics.xml      |  28 +++--
 .../remoteBroker-advancedNetworkStatistics.xml     |   8 +-
 30 files changed, 861 insertions(+), 154 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 7c77d9aa3f..5e0ab749e4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -30,6 +30,8 @@ import javax.jms.Connection;
 import javax.jms.InvalidSelectorException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import java.util.function.Function;
+import java.util.function.Supplier;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
@@ -52,6 +54,8 @@ import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.management.MessageFlowStats;
+import org.apache.activemq.management.UnsampledStatistic;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.util.URISupport;
@@ -620,4 +624,63 @@ public class DestinationView implements 
DestinationViewMBean {
         return 
destination.getDestinationStatistics().getNetworkDequeues().getCount();
     }
 
+    @Override
+    public boolean isAdvancedMessageStatisticsEnabled() {
+        return destination.isAdvancedMessageStatisticsEnabled();
+    }
+
+    @Override
+    public void setAdvancedMessageStatisticsEnabled(boolean 
advancedMessageStatisticsEnabled) {
+        
destination.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
+    }
+
+    @Override
+    public long getEnqueuedMessageBrokerInTime() {
+        return 
getMessageFlowStat(MessageFlowStats::getEnqueuedMessageBrokerInTime, 0L);
+    }
+
+    @Override
+    public String getEnqueuedMessageClientId() {
+        return 
getMessageFlowStat(MessageFlowStats::getEnqueuedMessageClientID, null);
+    }
+
+    @Override
+    public String getEnqueuedMessageId() {
+        return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageID, 
null);
+    }
+
+    @Override
+    public long getEnqueuedMessageTimestamp() {
+        return 
getMessageFlowStat(MessageFlowStats::getEnqueuedMessageTimestamp, 0L);
+    }
+
+    @Override
+    public long getDequeuedMessageBrokerInTime() {
+        return 
getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerInTime, 0L);
+    }
+
+    @Override
+    public long getDequeuedMessageBrokerOutTime() {
+        return 
getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerOutTime, 0L);
+    }
+
+    @Override
+    public String getDequeuedMessageClientId() {
+        return 
getMessageFlowStat(MessageFlowStats::getDequeuedMessageClientID, null);
+    }
+
+    @Override
+    public String getDequeuedMessageId() {
+        return getMessageFlowStat(MessageFlowStats::getDequeuedMessageID, 
null);
+    }
+
+    @Override
+    public long getDequeuedMessageTimestamp() {
+        return 
getMessageFlowStat(MessageFlowStats::getDequeuedMessageTimestamp, 0L);
+    }
+
+    private <T> T getMessageFlowStat(Function<MessageFlowStats, 
UnsampledStatistic<T>> f, T defVal) {
+        final var stats = 
destination.getDestinationStatistics().getMessageFlowStats();
+        return stats != null ? f.apply(stats).getValue() : defVal;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index 68861e89f4..431424f604 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -493,4 +493,37 @@ public interface DestinationViewMBean {
 
     @MBeanInfo("Number of messages acknowledged from the destination via 
network connection")
     long getNetworkDequeues();
+   
+    @MBeanInfo("Query Advanced Message Statistics flag")
+    boolean isAdvancedMessageStatisticsEnabled();
+
+    @MBeanInfo("Toggle Advanced Message Statistics flag")
+    void setAdvancedMessageStatisticsEnabled(boolean 
advancedMessageStatisticsEnabled);
+
+    @MBeanInfo("Broker in time (ms) of last enqueued message to the 
destination")
+    long getEnqueuedMessageBrokerInTime();
+
+    @MBeanInfo("ClientID of last enqueued message to the destination")
+    String getEnqueuedMessageClientId();
+
+    @MBeanInfo("MessageID of last enqueued message to the destination")
+    String getEnqueuedMessageId();
+
+    @MBeanInfo("Message timestamp in (ms) of last enqueued message to the 
destination")
+    long getEnqueuedMessageTimestamp();
+
+    @MBeanInfo("Broker in time (ms) of last dequeued message to the 
destination")
+    long getDequeuedMessageBrokerInTime();
+
+    @MBeanInfo("Broker out time (ms) of last dequeued message to the 
destination")
+    long getDequeuedMessageBrokerOutTime();
+
+    @MBeanInfo("ClientID of last dequeued message to the destination")
+    String getDequeuedMessageClientId();
+
+    @MBeanInfo("MessageID of last dequeued message to the destination")
+    String getDequeuedMessageId();
+
+    @MBeanInfo("Message timestamp in (ms) of last dequeued message to the 
destination")
+    long getDequeuedMessageTimestamp();
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index e3b72e1d62..7be5f52815 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -827,7 +827,7 @@ public abstract class BaseDestination implements 
Destination {
     @Override
     public void markForGC(long timeStamp) {
         if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
-                && destinationStatistics.messages.getCount() == 0 && 
getInactiveTimeoutBeforeGC() > 0l) {
+                && destinationStatistics.getMessages().getCount() == 0 && 
getInactiveTimeoutBeforeGC() > 0l) {
             this.lastActiveTime = timeStamp;
         }
     }
@@ -836,7 +836,7 @@ public abstract class BaseDestination implements 
Destination {
     public boolean canGC() {
         boolean result = false;
         final long currentLastActiveTime = this.lastActiveTime;
-        if (isGcIfInactive() && currentLastActiveTime != 0l && 
destinationStatistics.messages.getCount() == 0L ) {
+        if (isGcIfInactive() && currentLastActiveTime != 0l && 
destinationStatistics.getMessages().getCount() == 0L ) {
             if ((System.currentTimeMillis() - currentLastActiveTime) >= 
getInactiveTimeoutBeforeGC()) {
                 result = true;
             }
@@ -880,6 +880,16 @@ public abstract class BaseDestination implements 
Destination {
         this.advancedNetworkStatisticsEnabled = 
advancedNetworkStatisticsEnabled;
     }
 
+    @Override
+    public boolean isAdvancedMessageStatisticsEnabled() {
+        return this.destinationStatistics.isAdvancedMessageStatisticsEnabled();
+    }
+
+    @Override
+    public void setAdvancedMessageStatisticsEnabled(boolean 
advancedMessageStatisticsEnabled) {
+        
this.destinationStatistics.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
+    }
+
     @Override
     public abstract List<Subscription> getConsumers();
 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 45e3de7b3c..22ba14894b 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -259,9 +259,12 @@ public interface Destination extends Service, Task, 
Message.MessageDestination {
 
     void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
 
-    // [AMQ-9437]
     boolean isAdvancedNetworkStatisticsEnabled();
 
     void setAdvancedNetworkStatisticsEnabled(boolean 
advancedNetworkStatisticsEnabled);
 
+    boolean isAdvancedMessageStatisticsEnabled();
+
+    void setAdvancedMessageStatisticsEnabled(boolean 
advancedMessageStatisticsEnabled);
+
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 85ef367a77..1ab96560ac 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -419,6 +419,16 @@ public class DestinationFilter implements Destination {
         
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
     }
 
+    @Override
+    public boolean isAdvancedMessageStatisticsEnabled() {
+        return next.isAdvancedMessageStatisticsEnabled();
+    }
+
+    @Override
+    public void setAdvancedMessageStatisticsEnabled(boolean 
advancedMessageStatisticsEnabled) {
+        
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
+    }
+
     public void deleteSubscription(ConnectionContext context, SubscriptionKey 
key) throws Exception {
         if (next instanceof DestinationFilter) {
             DestinationFilter filter = (DestinationFilter) next;
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index dc6b17dfaf..86833b86f8 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -14,41 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.broker.region;
 
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.management.PollCountStatisticImpl;
 import org.apache.activemq.management.StatsImpl;
 import org.apache.activemq.management.*;
 
 /**
- * The J2EE Statistics for the a Destination.
- *
- *
+ * The Statistics for a Destination.
  */
 public class DestinationStatistics extends StatsImpl {
 
-    protected CountStatisticImpl enqueues;
-    protected CountStatisticImpl dequeues;
-    protected CountStatisticImpl forwards;
-    protected CountStatisticImpl consumers;
-    protected CountStatisticImpl producers;
-    protected CountStatisticImpl messages;
-    protected PollCountStatisticImpl messagesCached;
-    protected CountStatisticImpl dispatched;
-    protected CountStatisticImpl duplicateFromStore;
-    protected CountStatisticImpl inflight;
-    protected CountStatisticImpl expired;
-    protected TimeStatisticImpl processTime;
-    protected CountStatisticImpl blockedSends;
-    protected TimeStatisticImpl blockedTime;
-    protected SizeStatisticImpl messageSize;
-    protected CountStatisticImpl maxUncommittedExceededCount;
-
-    // [AMQ-9437] Advanced Statistics are optionally enabled
-    protected CountStatisticImpl networkEnqueues;
-    protected CountStatisticImpl networkDequeues;
+    private final CountStatisticImpl enqueues;
+    private final CountStatisticImpl dequeues;
+    private final CountStatisticImpl forwards;
+    private final CountStatisticImpl consumers;
+    private final CountStatisticImpl producers;
+    private final CountStatisticImpl messages;
+    private final PollCountStatisticImpl messagesCached;
+    private final CountStatisticImpl dispatched;
+    private final CountStatisticImpl duplicateFromStore;
+    private final CountStatisticImpl inflight;
+    private final CountStatisticImpl expired;
+    private final TimeStatisticImpl processTime;
+    private final CountStatisticImpl blockedSends;
+    private final TimeStatisticImpl blockedTime;
+    private final SizeStatisticImpl messageSize;
+    private final CountStatisticImpl maxUncommittedExceededCount;
+
+    // [AMQ-9437] Advanced Network Statistics
+    private final CountStatisticImpl networkEnqueues;
+    private final CountStatisticImpl networkDequeues;
+
+    // [AMQ-8463] Advanced Message Statistics are disabled by default
+    private final AtomicReference<MessageFlowStatsImpl> messageFlowStats = new 
AtomicReference<>();
 
     public DestinationStatistics() {
 
@@ -75,25 +77,6 @@ public class DestinationStatistics extends StatsImpl {
 
         networkEnqueues = new CountStatisticImpl("networkEnqueues", "The 
number of messages that have been sent to the destination via network 
connection");
         networkDequeues = new CountStatisticImpl("networkDequeues", "The 
number of messages that have been acknowledged from the destination via network 
connection");
-
-        addStatistic("enqueues", enqueues);
-        addStatistic("dispatched", dispatched);
-        addStatistic("dequeues", dequeues);
-        addStatistic("duplicateFromStore", duplicateFromStore);
-        addStatistic("inflight", inflight);
-        addStatistic("expired", expired);
-        addStatistic("consumers", consumers);
-        addStatistic("producers", producers);
-        addStatistic("messages", messages);
-        addStatistic("messagesCached", messagesCached);
-        addStatistic("processTime", processTime);
-        addStatistic("blockedSends",blockedSends);
-        addStatistic("blockedTime",blockedTime);
-        addStatistic("messageSize",messageSize);
-        addStatistic("maxUncommittedExceededCount", 
maxUncommittedExceededCount);
-
-        addStatistic("networkEnqueues", networkEnqueues);
-        addStatistic("networkDequeues", networkDequeues);
     }
 
     public CountStatisticImpl getEnqueues() {
@@ -132,10 +115,6 @@ public class DestinationStatistics extends StatsImpl {
         return messages;
     }
 
-    public void setMessagesCached(PollCountStatisticImpl messagesCached) {
-        this.messagesCached = messagesCached;
-    }
-
     public CountStatisticImpl getDispatched() {
         return dispatched;
     }
@@ -170,6 +149,10 @@ public class DestinationStatistics extends StatsImpl {
         return networkDequeues;
     }
 
+    public MessageFlowStats getMessageFlowStats() {
+        return messageFlowStats.get();
+    }
+
     public void reset() {
         if (this.isDoReset()) {
             super.reset();
@@ -186,6 +169,8 @@ public class DestinationStatistics extends StatsImpl {
             maxUncommittedExceededCount.reset();
             networkEnqueues.reset();
             networkDequeues.reset();
+            Optional.ofNullable(messageFlowStats.get())
+                .ifPresent(MessageFlowStatsImpl::reset);
         }
     }
 
@@ -208,9 +193,13 @@ public class DestinationStatistics extends StatsImpl {
         messageSize.setEnabled(enabled);
         maxUncommittedExceededCount.setEnabled(enabled);
 
-        // [AMQ-9437] Advanced Statistics
+        // [AMQ-9437] Advanced Network Statistics
         networkEnqueues.setEnabled(enabled);
         networkDequeues.setEnabled(enabled);
+
+        // [AMQ-9437] Advanced Message Statistics
+        Optional.ofNullable(messageFlowStats.get())
+            .ifPresent(stats -> stats.setEnabled(enabled));
     }
 
     public void setParent(DestinationStatistics parent) {
@@ -233,6 +222,7 @@ public class DestinationStatistics extends StatsImpl {
             
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
             networkEnqueues.setParent(parent.networkEnqueues);
             networkDequeues.setParent(parent.networkDequeues);
+            // [AMQ-9437] Advanced Message Statistics does not have a parent.
         } else {
             enqueues.setParent(null);
             dispatched.setParent(null);
@@ -252,7 +242,25 @@ public class DestinationStatistics extends StatsImpl {
             maxUncommittedExceededCount.setParent(null);
             networkEnqueues.setParent(null);
             networkDequeues.setParent(null);
+            // [AMQ-9437] Advanced Message Statistics does not have a parent.
         }
     }
 
+    // This is the only method that can mutate the messageFlowStats state
+    public synchronized void setAdvancedMessageStatisticsEnabled(boolean 
enabled) {
+        if(!enabled) {
+            this.messageFlowStats.set(null);
+            return;
+        }
+
+        if(this.messageFlowStats.get() == null) {
+            MessageFlowStatsImpl tmpMessageFlowStatsImpl = new 
MessageFlowStatsImpl();
+            tmpMessageFlowStatsImpl.setEnabled(true);
+            this.messageFlowStats.set(tmpMessageFlowStatsImpl);
+        }
+    }
+
+    public boolean isAdvancedMessageStatisticsEnabled() {
+        return this.messageFlowStats.get() != null;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 1236342cb8..13106cd0f3 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -42,6 +42,7 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.management.MessageFlowStats;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.usage.SystemUsage;
@@ -374,6 +375,11 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
             
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled()
 && getContext() != null && getContext().isNetworkConnection()) {
                 
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
             }
+
+            final MessageFlowStats tmpMessageFlowStats = 
((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
+            if(tmpMessageFlowStats != null) {
+                tmpMessageFlowStats.dequeueStats(context.getClientId(), 
node.getMessageId().toString(), node.getMessage().getTimestamp(), 
node.getMessage().getBrokerInTime(), node.getMessage().getBrokerOutTime());
+            }
         }
     }
 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 1d443f6c81..c87b37c1b6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1913,11 +1913,17 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
     private void dropMessage(ConnectionContext context, QueueMessageReference 
reference) {
         //use dropIfLive so we only process the statistics at most one time
         if (reference.dropIfLive()) {
-            getDestinationStatistics().getDequeues().increment();
-            getDestinationStatistics().getMessages().decrement();
+            destinationStatistics.getDequeues().increment();
+            destinationStatistics.getMessages().decrement();
+
+            final var tmpMessageFlowStats = 
destinationStatistics.getMessageFlowStats();
+            if(tmpMessageFlowStats != null) {
+                Message tmpMessage = reference.getMessage();
+                tmpMessageFlowStats.dequeueStats(context.getClientId(), 
tmpMessage.getMessageId().toString(), tmpMessage.getTimestamp(), 
tmpMessage.getBrokerInTime(), tmpMessage.getBrokerOutTime()); 
+            }
 
             if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() 
!= null && context.getConnection().isNetworkConnection()) {
-                getDestinationStatistics().getNetworkDequeues().increment();
+                destinationStatistics.getNetworkDequeues().increment();
             }
 
             pagedInMessagesLock.writeLock().lock();
@@ -1971,10 +1977,16 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
 
     final void messageSent(final ConnectionContext context, final Message msg) 
throws Exception {
         pendingSends.decrementAndGet();
+
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         destinationStatistics.getMessageSize().addSize(msg.getSize());
 
+        final var tmpMessageFlowStats = 
destinationStatistics.getMessageFlowStats();
+        if(tmpMessageFlowStats != null) {
+            tmpMessageFlowStats.enqueueStats(context.getClientId(), 
msg.getMessageId().toString(), msg.getTimestamp(), msg.getBrokerInTime());
+        }
+
         if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != 
null && context.getConnection().isNetworkConnection()) {
             destinationStatistics.getNetworkEnqueues().increment();
         }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 8c3c8bb36d..92e50c04bf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -51,6 +51,7 @@ import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.management.MessageFlowStats;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.NoLocalSubscriptionAware;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -779,6 +780,11 @@ public class Topic extends BaseDestination implements Task 
{
         // destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
 
+        final var tmpMessageFlowStats = 
destinationStatistics.getMessageFlowStats();
+        if(tmpMessageFlowStats != null) {
+            tmpMessageFlowStats.enqueueStats(context.getClientId(), 
message.getMessageId().toString(), message.getTimestamp(), 
message.getBrokerInTime());
+        }
+
         if(isAdvancedNetworkStatisticsEnabled() && context != null && 
context.isNetworkConnection()) {
             destinationStatistics.getNetworkEnqueues().increment();
         }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 613c4a0f69..73a5113765 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -25,6 +25,7 @@ import 
org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
 import org.apache.activemq.command.*;
+import org.apache.activemq.management.MessageFlowStats;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.transport.TransmitCallback;
@@ -453,6 +454,11 @@ public class TopicSubscription extends 
AbstractSubscription {
                 
destination.getDestinationStatistics().getNetworkDequeues().add(count);
             }
         }
+
+        final var tmpMessageFlowStats = 
destination.getDestinationStatistics().getMessageFlowStats();
+        if(tmpMessageFlowStats != null) {
+            tmpMessageFlowStats.dequeueStats(context.getClientId(), 
ack.getLastMessageId().toString());
+        }
         if (ack.isExpiredAck()) {
             destination.getDestinationStatistics().getExpired().add(count);
         }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index f5ab784179..8ae052574b 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -107,6 +107,8 @@ public class PolicyEntry extends DestinationMapEntry {
     private int maxDestinations = -1;
     private boolean useTopicSubscriptionInflightStats = true;
     private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
+    private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463]
+
     /*
      * percentage of in-flight messages above which optimize message store is 
disabled
      */
@@ -309,6 +311,9 @@ public class PolicyEntry extends DestinationMapEntry {
         if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
             
destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
         }
+        if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) {
+            
destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled());
+        }
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -1187,4 +1192,12 @@ public class PolicyEntry extends DestinationMapEntry {
     public void setAdvancedNetworkStatisticsEnabled(boolean 
advancedNetworkStatisticsEnabled) {
         this.advancedNetworkStatisticsEnabled = 
advancedNetworkStatisticsEnabled;
     }
+
+    public boolean isAdvancedMessageStatisticsEnabled() {
+        return this.advancedMessageStatisticsEnabled;
+    }
+
+    public void setAdvancedMessageStatisticsEnabled(boolean 
advancedMessageStatisticsEnabled) {
+        this.advancedMessageStatisticsEnabled = 
advancedMessageStatisticsEnabled;
+    }
 }
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
index 8be581cc61..b04596585d 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
@@ -92,7 +92,7 @@ public class MessageStoreSubscriptionStatistics extends 
AbstractMessageStoreStat
     private class SubscriptionStatistics extends 
AbstractMessageStoreStatistics {
 
         public SubscriptionStatistics() {
-            this(MessageStoreSubscriptionStatistics.this.enabled);
+            this(MessageStoreSubscriptionStatistics.this.isEnabled());
         }
 
         /**
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
index 4f77310011..f1aef71db2 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.management;
 
+import java.util.Set;
+
 /**
  * Statistics for a JCA connection pool
  * 
@@ -39,9 +41,7 @@ public class JCAConnectionPoolStatsImpl extends 
JCAConnectionStatsImpl {
         this.waitingThreadCount = waitingThreadCount;
 
         // lets add named stats
-        addStatistic("freePoolSize", freePoolSize);
-        addStatistic("poolSize", poolSize);
-        addStatistic("waitingThreadCount", waitingThreadCount);
+        addStatistics(Set.of(freePoolSize, poolSize, waitingThreadCount));
     }
 
     public CountStatisticImpl getCloseCount() {
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
index 1ffd7e6eca..fc9a806d7d 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.management;
 
+import java.util.Set;
 
 /**
  * Statistics for a JCA connection
@@ -35,8 +36,7 @@ public class JCAConnectionStatsImpl extends StatsImpl {
         this.useTime = useTime;
 
         // lets add named stats
-        addStatistic("waitTime", waitTime);
-        addStatistic("useTime", useTime);
+        addStatistics(Set.of(waitTime, useTime));
     }
 
     public String getConnectionFactory() {
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
index e0aa0c862c..027af27cc8 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
@@ -21,6 +21,8 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import java.util.Set;
+
 import org.apache.activemq.util.IndentPrinter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,11 +79,7 @@ public class JMSEndpointStatsImpl extends StatsImpl {
         this.messageRateTime = messageRateTime;
 
         // lets add named stats
-        addStatistic("messageCount", messageCount);
-        addStatistic("pendingMessageCount", pendingMessageCount);
-        addStatistic("expiredMessageCount", expiredMessageCount);
-        addStatistic("messageWaitTime", messageWaitTime);
-        addStatistic("messageRateTime", messageRateTime);
+        addStatistics(Set.of(messageCount, pendingMessageCount, 
expiredMessageCount, messageWaitTime, messageRateTime));
     }
 
     public synchronized void reset() {
@@ -128,7 +126,7 @@ public class JMSEndpointStatsImpl extends StatsImpl {
     }
 
     public void onMessage() {
-        if (enabled) {
+        if (isEnabled()) {
             long start = messageCount.getLastSampleTime();
             messageCount.increment();
             long end = messageCount.getLastSampleTime();
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
index ec8be9ac2b..f7778d73cd 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.management;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQMessageProducer;
@@ -54,12 +55,7 @@ public class JMSSessionStatsImpl extends StatsImpl {
                                                      "Time taken to process a 
message (thoughtput rate)");
 
         // lets add named stats
-        addStatistic("messageCount", messageCount);
-        addStatistic("pendingMessageCount", pendingMessageCount);
-        addStatistic("expiredMessageCount", expiredMessageCount);
-        addStatistic("messageWaitTime", messageWaitTime);
-        addStatistic("durableSubscriptionCount", durableSubscriptionCount);
-        addStatistic("messageRateTime", messageRateTime);
+        addStatistics(Set.of(messageCount, pendingMessageCount, 
expiredMessageCount, messageWaitTime, durableSubscriptionCount, 
messageRateTime));
     }
 
     public JMSProducerStatsImpl[] getProducers() {
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java
new file mode 100644
index 0000000000..bbb742b4ec
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+public interface MessageFlowStats {
+    UnsampledStatistic<Long> getEnqueuedMessageBrokerInTime();
+    UnsampledStatistic<String> getEnqueuedMessageClientID();
+    UnsampledStatistic<String> getEnqueuedMessageID();
+    UnsampledStatistic<Long> getEnqueuedMessageTimestamp();
+    UnsampledStatistic<Long> getDequeuedMessageBrokerInTime();
+    UnsampledStatistic<Long> getDequeuedMessageBrokerOutTime();
+    UnsampledStatistic<String> getDequeuedMessageClientID();
+    UnsampledStatistic<String> getDequeuedMessageID();
+    UnsampledStatistic<Long> getDequeuedMessageTimestamp();
+    void enqueueStats(String clientID, String messageID, long 
messageTimestamp, long messageBrokerInTime);
+    void dequeueStats(String clientID, String messageID);
+    void dequeueStats(String clientID, String messageID, long 
messageTimestamp, long messageBrokerInTime, long messageBrokerOutTime);
+}
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java
new file mode 100644
index 0000000000..3833f2bd79
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+import java.util.Set;
+
+public class MessageFlowStatsImpl extends UnsampledStatsImpl implements 
MessageFlowStats, Statistic, Resettable {
+
+    private final UnsampledStatisticImpl<Long> enqueuedMessageBrokerInTime;
+    private final UnsampledStatisticImpl<String> enqueuedMessageClientID;
+    private final UnsampledStatisticImpl<String> enqueuedMessageID;
+    private final UnsampledStatisticImpl<Long> enqueuedMessageTimestamp;
+    private final UnsampledStatisticImpl<Long> dequeuedMessageBrokerInTime;
+    private final UnsampledStatisticImpl<Long> dequeuedMessageBrokerOutTime;
+    private final UnsampledStatisticImpl<String> dequeuedMessageClientID;
+    private final UnsampledStatisticImpl<String> dequeuedMessageID;
+    private final UnsampledStatisticImpl<Long> dequeuedMessageTimestamp;
+
+    public MessageFlowStatsImpl() {
+        super();
+
+        enqueuedMessageBrokerInTime = new 
UnsampledStatisticImpl<>("enqueuedMessageBrokerInTime", "ms", "Broker in time 
(ms) of last enqueued message to the destination", Long.valueOf(0l));
+        enqueuedMessageClientID = new 
UnsampledStatisticImpl<>("enqueuedMessageClientID", "id", "ClientID of last 
enqueued message to the destination", null);
+        enqueuedMessageID = new UnsampledStatisticImpl<>("enqueuedMessageID", 
"id", "MessageID of last enqueued message to the destination", null);
+        enqueuedMessageTimestamp = new 
UnsampledStatisticImpl<>("enqueuedMessageTimestamp", "ms", "Message timestamp 
of last enqueued message to the destination", Long.valueOf(0l));
+
+        dequeuedMessageBrokerInTime = new 
UnsampledStatisticImpl<>("dequeuedMessageBrokerInTime", "ms", "Broker in time 
(ms) of last dequeued message to the destination", Long.valueOf(0l));
+        dequeuedMessageBrokerOutTime = new 
UnsampledStatisticImpl<>("dequeuedMessageBrokerOutTime", "ms", "Broker out time 
(ms) of last dequeued message to the destination", Long.valueOf(0l));
+        dequeuedMessageClientID = new 
UnsampledStatisticImpl<>("dequeuedMessageClientID", "id", "ClientID of last 
dequeued message to the destination", null);
+        dequeuedMessageID = new UnsampledStatisticImpl<>("dequeuedMessageID", 
"id", "MessageID of last dequeued message to the destination", null);
+        dequeuedMessageTimestamp = new 
UnsampledStatisticImpl<>("dequeuedMessageTimestamp", "ms", "Message timestamp 
of last dequeued message to the destination", Long.valueOf(0l));
+
+        addStatistics(Set.of(enqueuedMessageBrokerInTime, 
enqueuedMessageClientID, enqueuedMessageID, enqueuedMessageTimestamp,
+                dequeuedMessageBrokerInTime, dequeuedMessageBrokerOutTime, 
dequeuedMessageClientID, dequeuedMessageID, dequeuedMessageTimestamp));
+    }
+
+    @Override
+    public UnsampledStatistic<Long> getEnqueuedMessageBrokerInTime() {
+        return enqueuedMessageBrokerInTime;
+    }
+
+    @Override
+    public UnsampledStatistic<String> getEnqueuedMessageClientID() {
+        return enqueuedMessageClientID;
+    }
+
+    @Override
+    public UnsampledStatistic<String> getEnqueuedMessageID() {
+        return enqueuedMessageID;
+    }
+
+    @Override
+    public UnsampledStatistic<Long> getEnqueuedMessageTimestamp() {
+        return enqueuedMessageTimestamp;
+    }
+
+    @Override
+    public UnsampledStatistic<Long> getDequeuedMessageBrokerInTime() {
+        return dequeuedMessageBrokerInTime;
+    }
+
+    @Override
+    public UnsampledStatistic<Long> getDequeuedMessageBrokerOutTime() {
+        return dequeuedMessageBrokerOutTime;
+    }
+
+    @Override
+    public UnsampledStatistic<String> getDequeuedMessageClientID() {
+        return dequeuedMessageClientID;
+    }
+
+    @Override
+    public UnsampledStatistic<String> getDequeuedMessageID() {
+        return dequeuedMessageID;
+    }
+
+    @Override
+    public UnsampledStatistic<Long> getDequeuedMessageTimestamp() {
+        return dequeuedMessageTimestamp;
+    }
+
+    @Override
+    public synchronized void enqueueStats(String clientID, String messageID, 
long messageTimestamp, long messageBrokerInTime) {
+        enqueuedMessageClientID.setValue(clientID);
+        enqueuedMessageID.setValue(messageID);
+        enqueuedMessageTimestamp.setValue(messageTimestamp);
+        enqueuedMessageBrokerInTime.setValue(messageBrokerInTime);
+    }
+
+    @Override
+    public synchronized void dequeueStats(String clientID, String messageID) {
+        dequeuedMessageClientID.setValue(clientID);
+        dequeuedMessageID.setValue(messageID);
+    }
+
+    @Override
+    public synchronized void dequeueStats(String clientID, String messageID, 
long messageTimestamp, long messageBrokerInTime, long messageBrokerOutTime) {
+        dequeuedMessageClientID.setValue(clientID);
+        dequeuedMessageID.setValue(messageID);
+        dequeuedMessageTimestamp.setValue(messageTimestamp);
+        dequeuedMessageBrokerInTime.setValue(messageBrokerInTime);
+        dequeuedMessageBrokerOutTime.setValue(messageBrokerOutTime);
+    }
+}
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
index 1dbcc80c69..9dbf5a019c 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
@@ -15,22 +15,19 @@
  * limitations under the License.
  */
 package org.apache.activemq.management;
-
 /**
- * Base class for a Statistic implementation
- * 
- * 
+ * A thread-safe class for a Statistic implementation
  */
 public class StatisticImpl implements Statistic, Resettable {
 
-    protected boolean enabled;
+    private volatile boolean enabled;
 
-    private String name;
-    private String unit;
-    private String description;
-    private long startTime;
-    private long lastSampleTime;
-    private boolean doReset = true;
+    private final String name;
+    private final String unit;
+    private final String description;
+    private volatile long startTime;
+    private volatile long lastSampleTime;
+    private volatile boolean doReset = true;
 
     public StatisticImpl(String name, String unit, String description) {
         this.name = name;
@@ -40,6 +37,14 @@ public class StatisticImpl implements Statistic, Resettable {
         this.lastSampleTime = this.startTime;
     }
 
+    protected StatisticImpl(String name, String unit, String description, long 
startTime, long lastSampleTime) {
+        this.name = name;
+        this.unit = unit;
+        this.description = description;
+        this.startTime = startTime;
+        this.lastSampleTime = lastSampleTime;
+    }
+
     public synchronized void reset() {
         if(isDoReset()) {
             this.startTime = System.currentTimeMillis();
@@ -51,7 +56,8 @@ public class StatisticImpl implements Statistic, Resettable {
         this.lastSampleTime = System.currentTimeMillis();
     }
 
-    public synchronized String toString() {
+    public String toString() {
+        // NOTE: Do not double-lock here as appendFileDescription performs the 
lock
         StringBuffer buffer = new StringBuffer();
         buffer.append(name);
         buffer.append("{");
@@ -93,7 +99,7 @@ public class StatisticImpl implements Statistic, Resettable {
     public void setEnabled(boolean enabled) {
         this.enabled = enabled;
     }
-    
+
     /**
      * @return the doReset
      */
@@ -108,7 +114,6 @@ public class StatisticImpl implements Statistic, Resettable 
{
         this.doReset = doReset;
     }
 
-
     protected synchronized void appendFieldDescription(StringBuffer buffer) {
         buffer.append(" unit: ");
         buffer.append(this.unit);
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java 
b/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java
index f3ae60fb07..46d24a685f 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java
@@ -16,18 +16,15 @@
  */
 package org.apache.activemq.management;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 /**
- * Base class for a Stats implementation
- * 
- * 
+ * A thread-safe class for a Stats implementation
  */
 public class StatsImpl extends StatisticImpl implements Stats, Resettable {
     //use a Set instead of a Map - to conserve Space
-    private Set<StatisticImpl> set;
+    protected final Set<StatisticImpl> set;
 
     public StatsImpl() {
         this(new CopyOnWriteArraySet<StatisticImpl>());
@@ -38,44 +35,35 @@ public class StatsImpl extends StatisticImpl implements 
Stats, Resettable {
         this.set = set;
     }
 
-    public void reset() {
-        Statistic[] stats = getStatistics();
-        int size = stats.length;
-        for (int i = 0; i < size; i++) {
-            Statistic stat = stats[i];
-            if (stat instanceof Resettable) {
-                Resettable r = (Resettable) stat;
-                r.reset();
-            }
-        }
+    public synchronized void reset() {
+        this.set.stream()
+            .filter(Resettable.class::isInstance)
+            .map(Resettable.class::cast)
+            .forEach(resetStat -> resetStat.reset());
     }
 
-    public Statistic getStatistic(String name) {
-        for (StatisticImpl stat : this.set) {
-            if (stat.getName() != null && stat.getName().equals(name)) {
-                return stat;
-            }
-        }
-        return null;
+    public synchronized Statistic getStatistic(String name) {
+        return this.set.stream().filter(s -> 
s.getName().equals(name)).findFirst().orElse(null);
     }
 
-    public String[] getStatisticNames() {
-        List<String> names = new ArrayList<String>();
-        for (StatisticImpl stat : this.set) {
-            names.add(stat.getName());
-        }
-        String[] answer = new String[names.size()];
-        names.toArray(answer);
-        return answer;
+    public synchronized String[] getStatisticNames() {
+        return 
this.set.stream().map(StatisticImpl::getName).toArray(String[]::new);
     }
 
-    public Statistic[] getStatistics() {
-        Statistic[] answer = new Statistic[this.set.size()];
-        set.toArray(answer);
-        return answer;
+    public synchronized Statistic[] getStatistics() {
+        return this.set.toArray(new Statistic[this.set.size()]);
     }
 
-    protected void addStatistic(String name, StatisticImpl statistic) {
+    @Deprecated(forRemoval = true, since = "6.2.0")
+    protected synchronized void addStatistic(String name, StatisticImpl 
statistic) {
         this.set.add(statistic);
     }
+
+    protected void addStatistics(Collection<StatisticImpl> statistics) {
+        this.set.addAll(statistics);
+    }
+
+    protected void removeStatistics(Collection<StatisticImpl> statistics) {
+        this.set.removeAll(statistics); 
+    }
 }
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java
new file mode 100644
index 0000000000..1aff3bc14e
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.management;
+
+/**
+ * A Statistic without sampleTime or or startTime.
+ */
+public interface UnsampledStatistic<T> extends Statistic, Resettable {
+    public T getValue();
+    public void setValue(T value);
+}
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java
new file mode 100644
index 0000000000..45ca70bacc
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+/**
+ * An UnsampledStatistic<T> implementation
+ */
+public class UnsampledStatisticImpl<T> extends StatisticImpl implements 
UnsampledStatistic<T> {
+
+    private volatile T value;
+    private final T defaultValue;
+
+    public UnsampledStatisticImpl(String name, String unit, String 
description, T defaultValue) {
+        super(name, unit, description, 0l, 0l);
+        this.value = defaultValue;
+        this.defaultValue = defaultValue;
+    }
+
+    @Override
+    public void reset() {
+        if (isDoReset()) {
+            value = defaultValue;
+        }
+    }
+
+    @Override
+    protected void updateSampleTime() {}
+
+    @Override
+    public long getStartTime() {
+        return 0l;
+    }
+
+    @Override
+    public long getLastSampleTime() {
+        return 0l;
+    }
+
+    @Override
+    public T getValue() {
+        return value;
+    }
+
+    @Override
+    public void setValue(T value) {
+        if (isEnabled()) {
+            this.value = value;
+        }
+    }
+
+    protected void appendFieldDescription(StringBuffer buffer) {
+        buffer.append(" value: ");
+        buffer.append(value);
+        super.appendFieldDescription(buffer);
+    }
+}
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java
 
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java
new file mode 100644
index 0000000000..10d5facec2
--- /dev/null
+++ 
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+public class UnsampledStatsImpl extends StatsImpl {
+
+    public UnsampledStatsImpl() {
+        super();
+    }
+
+    @Override
+    protected void updateSampleTime() {}
+
+    @Override
+    public long getStartTime() {
+        return 0;
+    }
+    @Override
+    public long getLastSampleTime() {
+        return 0;
+    }
+
+    @Override
+    public synchronized void setEnabled(boolean enabled) {
+        set.stream().forEach(stat -> stat.setEnabled(enabled));
+    }
+}
diff --git 
a/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java
 
b/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java
new file mode 100644
index 0000000000..f53207b3a9
--- /dev/null
+++ 
b/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.management;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class UnsampledStatisticsTest {
+
+    @Test
+    public void testUnsampledStatisticsEnabledTest() {
+        UnsampledStatisticImpl<Long> longStatistic = new 
UnsampledStatisticImpl<>("longStat", "long", "A long statistic", 
Long.valueOf(0l));
+        longStatistic.setEnabled(true);
+        longStatistic.setValue(Long.MAX_VALUE);
+
+        UnsampledStatisticImpl<String> stringStatistic = new 
UnsampledStatisticImpl<>("stringStat", "chars", "A string statistic", null);
+        stringStatistic.setEnabled(true);
+        stringStatistic.setValue("Hello World!");
+
+        assertEquals("A long statistic", longStatistic.getDescription());
+        assertEquals(Long.valueOf(0l), 
Long.valueOf(longStatistic.getLastSampleTime()));
+        assertEquals("longStat", longStatistic.getName());
+        assertEquals(Long.valueOf(0l), 
Long.valueOf(longStatistic.getStartTime()));
+        assertEquals("long", longStatistic.getUnit());
+        assertEquals(Long.valueOf(Long.MAX_VALUE), longStatistic.getValue());
+        assertTrue(longStatistic.toString().contains("value: " + 
Long.MAX_VALUE));
+        longStatistic.reset();
+        assertEquals(Long.valueOf(0l), longStatistic.getValue());
+        assertTrue(longStatistic.toString().contains("value: 0"));
+
+        longStatistic.reset();
+        longStatistic.setEnabled(false);
+        assertFalse(longStatistic.isEnabled());
+        longStatistic.setValue(12345678l);
+        assertEquals(Long.valueOf(0l), longStatistic.getValue());
+
+        assertEquals("A string statistic", stringStatistic.getDescription());
+        assertEquals(Long.valueOf(0l), 
Long.valueOf(stringStatistic.getLastSampleTime()));
+        assertEquals("stringStat", stringStatistic.getName());
+        assertEquals(Long.valueOf(0l), 
Long.valueOf(stringStatistic.getStartTime()));
+        assertEquals("chars", stringStatistic.getUnit());
+        assertEquals("Hello World!", stringStatistic.getValue());
+        assertTrue(stringStatistic.toString().contains("value: Hello World!"));
+        stringStatistic.reset();
+        assertNull(stringStatistic.getValue());
+        assertTrue(stringStatistic.toString().contains("value: null"));
+
+        stringStatistic.reset();
+        stringStatistic.setEnabled(false);
+        assertFalse(stringStatistic.isEnabled());
+        stringStatistic.setValue("This should be ignored");
+        assertNull(stringStatistic.getValue());
+    }
+
+}
diff --git 
a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
 
b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
index 4f077018ac..e7abddd8da 100644
--- 
a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
+++ 
b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
@@ -57,6 +57,18 @@ public class PolicyEntryTest extends 
RuntimeConfigTestSupport {
         verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true);
     }
 
+    @Test
+    public void testModAdvancedMessageStatistics() throws Exception {
+        final String brokerConfig = configurationSeed + "-policy-ml-broker";
+        applyNewConfig(brokerConfig, configurationSeed + 
"-policy-advancedMessageStatistics");
+        startBroker(brokerConfig);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        verifyBooleanField("AMQ.8463", "advancedMessageStatisticsEnabled", 
false);
+        applyNewConfig(brokerConfig, configurationSeed + 
"-policy-advancedMessageStatistics-mod", SLEEP);
+        verifyBooleanField("AMQ.8463", "advancedMessageStatisticsEnabled", 
true);
+    }
+
     @Test
     public void testModAdvancedNetworkStatistics() throws Exception {
         final String brokerConfig = configurationSeed + "-policy-ml-broker";
@@ -133,6 +145,9 @@ public class PolicyEntryTest extends 
RuntimeConfigTestSupport {
             session.createConsumer(session.createQueue(dest));
 
             switch(fieldName) {
+            case "advancedMessageStatisticsEnabled":
+                assertEquals(value, 
brokerService.getRegionBroker().getDestinationMap().get(new 
ActiveMQQueue(dest)).isAdvancedMessageStatisticsEnabled());
+                break;
             case "advancedNetworkStatisticsEnabled":
                 assertEquals(value, 
brokerService.getRegionBroker().getDestinationMap().get(new 
ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled());
                 break;
diff --git 
a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml
 
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml
new file mode 100644
index 0000000000..4f56b3185d
--- /dev/null
+++ 
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+        xmlns="http://www.springframework.org/schema/beans";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+
+  <broker xmlns="http://activemq.apache.org/schema/core"; start="false" 
persistent="false">
+    <plugins>
+      <runtimeConfigurationPlugin checkPeriod="1000" />
+    </plugins>
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue="AMQ.8463" 
advancedMessageStatisticsEnabled="true"/>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+</beans>
diff --git 
a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml
 
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml
new file mode 100644
index 0000000000..8b9166c778
--- /dev/null
+++ 
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+        xmlns="http://www.springframework.org/schema/beans";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd";>
+
+  <broker xmlns="http://activemq.apache.org/schema/core"; start="false" 
persistent="false">
+    <plugins>
+      <runtimeConfigurationPlugin checkPeriod="1000" />
+    </plugins>
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue="AMQ.8463"/>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+</beans>
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
index e5c755dc9d..4922cd2ba1 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
@@ -17,20 +17,29 @@
 package org.apache.activemq.network;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.Session;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.management.MessageFlowStats;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.util.Wait.Condition;
 import org.junit.Test;
@@ -44,8 +53,9 @@ public class NetworkAdvancedStatisticsTest extends 
BaseNetworkTest {
     @Parameterized.Parameters(name="includedDestination={0}, 
excludedDestination={1}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { new ActiveMQTopic("include.test.bar"), new 
ActiveMQTopic("exclude.test.bar")},
-                { new ActiveMQQueue("include.test.foo"), new 
ActiveMQQueue("exclude.test.foo")}});
+                { new ActiveMQTopic("include.test.durable"), new 
ActiveMQTopic("exclude.test.durable"), true},
+                { new ActiveMQTopic("include.test.nondurable"), new 
ActiveMQTopic("exclude.test.nondurable"), false},
+                { new ActiveMQQueue("include.test.foo"), new 
ActiveMQQueue("exclude.test.foo"), false}});
     }
 
     protected static final int MESSAGE_COUNT = 10;
@@ -55,15 +65,38 @@ public class NetworkAdvancedStatisticsTest extends 
BaseNetworkTest {
 
     private final ActiveMQDestination includedDestination;
     private final ActiveMQDestination excludedDestination;
+    private final boolean durable;
 
-    public NetworkAdvancedStatisticsTest(ActiveMQDestination 
includedDestionation, ActiveMQDestination excludedDestination) {
+    public NetworkAdvancedStatisticsTest(ActiveMQDestination 
includedDestionation, ActiveMQDestination excludedDestination, boolean durable) 
{
         this.includedDestination = includedDestionation;
         this.excludedDestination = excludedDestination;
+        this.durable = durable;
     }
 
     @Override
     protected void doSetUp(boolean deleteAllMessages) throws Exception {
-        super.doSetUp(deleteAllMessages);
+        remoteBroker = createRemoteBroker();
+        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        localBroker = createLocalBroker();
+        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID("localClientId");
+
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        fac = new ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID("remoteClientId");
+
+        localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        remoteSession = remoteConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
     }
 
     @Override
@@ -81,32 +114,55 @@ public class NetworkAdvancedStatisticsTest extends 
BaseNetworkTest {
     public void testNetworkAdvancedStatistics() throws Exception {
 
         // create a remote durable consumer to create demand
+        final Map<String, Message> receivedMessages = new 
ConcurrentHashMap<>();
+        final Collection<Exception> receivedExceptions = new 
ConcurrentLinkedQueue<>();
+
         MessageConsumer remoteConsumer;
-        if(includedDestination.isTopic()) {
+        if(includedDestination.isTopic() && durable) {
             remoteConsumer = 
remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination),
 consumerName);
         } else {
             remoteConsumer = remoteSession.createConsumer(includedDestination);
-            remoteConsumer.setMessageListener(new MessageListener() {          
      
-                @Override
-                public void onMessage(Message message) {
-                }
-            });
         }
-        Thread.sleep(1000);
+        remoteConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    receivedMessages.put(message.getJMSMessageID(), message);
+                } catch (JMSException e) {
+                    receivedExceptions.add(e);
+                }
+            }
+        });
+
+        localConnection.start();
+        remoteConnection.start();
 
         MessageProducer producer = 
localSession.createProducer(includedDestination);
+        String lastIncludedSentMessageID = null;
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
+            lastIncludedSentMessageID = test.getJMSMessageID();
         }
-        Thread.sleep(1000);
 
         MessageProducer producerExcluded = 
localSession.createProducer(excludedDestination);
+        String lastExcludedSentMessageID = null;
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             Message test = localSession.createTextMessage("test-" + i);
             producerExcluded.send(test);
+            lastExcludedSentMessageID = test.getJMSMessageID();
         }
-        Thread.sleep(1000);
+
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                // The number of message that remain is due to the exclude 
queue
+                return receivedMessages.size() == MESSAGE_COUNT;
+            }
+        }, 10000, 500));
+
+        assertTrue(receivedExceptions.isEmpty());
+        assertEquals(Integer.valueOf(MESSAGE_COUNT), 
Integer.valueOf(receivedMessages.size()));
 
         //Make sure stats are correct for local -> remote
         assertEquals(MESSAGE_COUNT, 
localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
@@ -119,7 +175,36 @@ public class NetworkAdvancedStatisticsTest extends 
BaseNetworkTest {
         assertEquals(MESSAGE_COUNT, 
remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
         assertEquals(0, 
remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
 
-        // Make sure stats do not increment for local-only
+        // Advanced Message status - enqueue
+        MessageFlowStats localBrokerIncludedMessageFlowStats = 
localBroker.getDestination(includedDestination).getDestinationStatistics().getMessageFlowStats();
+        MessageFlowStats localBrokerExcludedMessageFlowStats = 
localBroker.getDestination(excludedDestination).getDestinationStatistics().getMessageFlowStats();
+        MessageFlowStats remoteBrokerExcludedMessageFlowStats = 
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getMessageFlowStats();
+
+        assertEquals(lastIncludedSentMessageID, 
localBrokerIncludedMessageFlowStats.getEnqueuedMessageID().getValue());
+        
assertNotNull(localBrokerIncludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue());
+        
assertTrue(localBrokerIncludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue()
 > 0l);
+        
assertNotNull(localBrokerIncludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue());
+        
assertTrue(localBrokerIncludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue()
 > 0l);
+        assertEquals("localClientId", 
localBrokerIncludedMessageFlowStats.getEnqueuedMessageClientID().getValue());
+
+        // Advanced Message status - dequeue
+        assertEquals(lastIncludedSentMessageID, 
localBrokerIncludedMessageFlowStats.getDequeuedMessageID().getValue());
+        
assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue());
+        
assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue());
+        
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageClientID().getValue().startsWith("networkConnector"));
+        
assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue());
+
+        if(includedDestination.isTopic() && !durable) {
+            assertEquals(Long.valueOf(0l), 
localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue());
+            assertEquals(Long.valueOf(0l), 
localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue());
+            assertEquals(Long.valueOf(0l), 
localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue());
+        } else {
+            
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue()
 > 0l);
+            
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue()
 > 0l);
+            
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue()
 > 0l);
+        }
+
+        // Make sure stats do not increment for local-only excluded 
destinations
         assertEquals(MESSAGE_COUNT, 
localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
         assertEquals(0, 
localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
         assertEquals(0, 
localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
@@ -129,6 +214,21 @@ public class NetworkAdvancedStatisticsTest extends 
BaseNetworkTest {
         assertEquals(0, 
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
         assertEquals(0, 
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
         assertEquals(0, 
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+        assertEquals(lastExcludedSentMessageID, 
localBrokerExcludedMessageFlowStats.getEnqueuedMessageID().getValue());
+        
assertNull(localBrokerExcludedMessageFlowStats.getDequeuedMessageID().getValue());
+
+        // Advanced Message status - enqueue
+        
assertNull(remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageID().getValue());
+        assertEquals(Long.valueOf(0l), 
remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue());
+        assertEquals(Long.valueOf(0l), 
remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue());
+        
assertNull(remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageClientID().getValue());
+
+        // Advanced Message status - dequeue
+        assertNull(lastIncludedSentMessageID, 
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageID().getValue());
+        assertEquals(Long.valueOf(0l), 
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue());
+        assertEquals(Long.valueOf(0l), 
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue());
+        assertEquals(Long.valueOf(0l), 
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageTimestamp().getValue());
+        
assertNull(remoteBrokerExcludedMessageFlowStats.getDequeuedMessageClientID().getValue());
 
         if(includedDestination.isTopic()) {
             assertTrue(Wait.waitFor(new Condition() {
@@ -164,4 +264,5 @@ public class NetworkAdvancedStatisticsTest extends 
BaseNetworkTest {
             }
         }));
     }
+
 }
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
index b543169f6e..f17fa9bcc5 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
@@ -28,11 +28,11 @@
     <destinationPolicy>
       <policyMap>
         <policyEntries>
-          <policyEntry queue="exclude.>" 
advancedNetworkStatisticsEnabled="true"/>
-          <policyEntry queue="include.>" 
advancedNetworkStatisticsEnabled="true"/> 
+          <policyEntry queue="exclude.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry queue="include.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
           <policyEntry topic="ActiveMQ.Advisory.>" />
-          <policyEntry topic="exclude.>" 
advancedNetworkStatisticsEnabled="true"/>
-          <policyEntry topic="include.>" 
advancedNetworkStatisticsEnabled="true"/> 
+          <policyEntry topic="exclude.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry topic="include.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
         </policyEntries>
       </policyMap>
     </destinationPolicy>
@@ -43,17 +43,19 @@
          conduitSubscriptions = "true"
          decreaseNetworkConsumerPriority = "false"
          name="networkConnector">
-       <dynamicallyIncludedDestinations>
-               <queue physicalName="include.test.foo"/>
-               <topic physicalName="include.test.bar"/>
-       </dynamicallyIncludedDestinations>
-       <excludedDestinations>
-               <queue physicalName="exclude.test.foo"/>
-               <topic physicalName="exclude.test.bar"/>
-       </excludedDestinations>
+         <dynamicallyIncludedDestinations>
+            <queue physicalName="include.test.foo"/>
+            <topic physicalName="include.test.durable"/>
+            <topic physicalName="include.test.nondurable"/>
+        </dynamicallyIncludedDestinations>
+        <excludedDestinations>
+            <queue physicalName="exclude.test.foo"/>
+            <topic physicalName="exclude.test.durable"/>
+            <topic physicalName="exclude.test.nondurable"/>
+        </excludedDestinations>
       </networkConnector>
     </networkConnectors>
-    
+
     <transportConnectors>
       <transportConnector uri="tcp://localhost:61616"/>
     </transportConnectors>
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
index a9cf93f73a..d2b3c0f33d 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
@@ -28,11 +28,11 @@
     <destinationPolicy>
       <policyMap>
         <policyEntries>
-          <policyEntry queue="exclude.>" 
advancedNetworkStatisticsEnabled="true"/>
-          <policyEntry queue="include.>" 
advancedNetworkStatisticsEnabled="true"/> 
+          <policyEntry queue="exclude.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry queue="include.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
           <policyEntry topic="ActiveMQ.Advisory.>" />
-          <policyEntry topic="exclude.>" 
advancedNetworkStatisticsEnabled="true"/>
-          <policyEntry topic="include.>" 
advancedNetworkStatisticsEnabled="true"/> 
+          <policyEntry topic="exclude.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry topic="include.>" 
advancedMessageStatisticsEnabled="true" 
advancedNetworkStatisticsEnabled="true"/>
         </policyEntries>
       </policyMap>
     </destinationPolicy>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to