This is an automated email from the ASF dual-hosted git repository.
mattrpav pushed a commit to branch activemq-5.19.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.19.x by this push:
new 30e1af80df [AMQ-8463] New Feature: Advanced Message Flow statistics
30e1af80df is described below
commit 30e1af80dfa38acf5804061b9133159ad8ad92f5
Author: Matt Pavlovich <[email protected]>
AuthorDate: Fri Dec 13 12:04:20 2024 -0600
[AMQ-8463] New Feature: Advanced Message Flow statistics
Co-authored-by: Christopher L. Shannon <[email protected]>
(cherry picked from commit 7f8cf0e7d977341b42f9b374a98292a586f95e79)
---
.../activemq/broker/jmx/DestinationView.java | 63 ++++++++++
.../activemq/broker/jmx/DestinationViewMBean.java | 33 ++++++
.../activemq/broker/region/BaseDestination.java | 14 ++-
.../apache/activemq/broker/region/Destination.java | 5 +-
.../activemq/broker/region/DestinationFilter.java | 10 ++
.../broker/region/DestinationStatistics.java | 104 ++++++++--------
.../broker/region/DurableTopicSubscription.java | 6 +
.../org/apache/activemq/broker/region/Queue.java | 18 ++-
.../org/apache/activemq/broker/region/Topic.java | 6 +
.../activemq/broker/region/TopicSubscription.java | 6 +
.../activemq/broker/region/policy/PolicyEntry.java | 13 ++
.../store/MessageStoreSubscriptionStatistics.java | 2 +-
.../management/JCAConnectionPoolStatsImpl.java | 6 +-
.../management/JCAConnectionStatsImpl.java | 4 +-
.../activemq/management/JMSEndpointStatsImpl.java | 10 +-
.../activemq/management/JMSSessionStatsImpl.java | 8 +-
.../activemq/management/MessageFlowStats.java | 32 +++++
.../activemq/management/MessageFlowStatsImpl.java | 118 +++++++++++++++++++
.../apache/activemq/management/StatisticImpl.java | 33 +++---
.../org/apache/activemq/management/StatsImpl.java | 60 ++++------
.../activemq/management/UnsampledStatistic.java | 27 +++++
.../management/UnsampledStatisticImpl.java | 70 +++++++++++
.../activemq/management/UnsampledStatsImpl.java | 41 +++++++
.../management/UnsampledStatisticsTest.java | 72 +++++++++++
.../java/org/apache/activemq/PolicyEntryTest.java | 15 +++
...ryTest-policy-advancedMessageStatistics-mod.xml | 36 ++++++
...yEntryTest-policy-advancedMessageStatistics.xml | 36 ++++++
.../network/NetworkAdvancedStatisticsTest.java | 131 ++++++++++++++++++---
.../localBroker-advancedNetworkStatistics.xml | 28 +++--
.../remoteBroker-advancedNetworkStatistics.xml | 8 +-
30 files changed, 861 insertions(+), 154 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 7c77d9aa3f..5e0ab749e4 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -30,6 +30,8 @@ import javax.jms.Connection;
import javax.jms.InvalidSelectorException;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import java.util.function.Function;
+import java.util.function.Supplier;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
@@ -52,6 +54,8 @@ import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.management.MessageFlowStats;
+import org.apache.activemq.management.UnsampledStatistic;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.URISupport;
@@ -620,4 +624,63 @@ public class DestinationView implements
DestinationViewMBean {
return
destination.getDestinationStatistics().getNetworkDequeues().getCount();
}
+ @Override
+ public boolean isAdvancedMessageStatisticsEnabled() {
+ return destination.isAdvancedMessageStatisticsEnabled();
+ }
+
+ @Override
+ public void setAdvancedMessageStatisticsEnabled(boolean
advancedMessageStatisticsEnabled) {
+
destination.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
+ }
+
+ @Override
+ public long getEnqueuedMessageBrokerInTime() {
+ return
getMessageFlowStat(MessageFlowStats::getEnqueuedMessageBrokerInTime, 0L);
+ }
+
+ @Override
+ public String getEnqueuedMessageClientId() {
+ return
getMessageFlowStat(MessageFlowStats::getEnqueuedMessageClientID, null);
+ }
+
+ @Override
+ public String getEnqueuedMessageId() {
+ return getMessageFlowStat(MessageFlowStats::getEnqueuedMessageID,
null);
+ }
+
+ @Override
+ public long getEnqueuedMessageTimestamp() {
+ return
getMessageFlowStat(MessageFlowStats::getEnqueuedMessageTimestamp, 0L);
+ }
+
+ @Override
+ public long getDequeuedMessageBrokerInTime() {
+ return
getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerInTime, 0L);
+ }
+
+ @Override
+ public long getDequeuedMessageBrokerOutTime() {
+ return
getMessageFlowStat(MessageFlowStats::getDequeuedMessageBrokerOutTime, 0L);
+ }
+
+ @Override
+ public String getDequeuedMessageClientId() {
+ return
getMessageFlowStat(MessageFlowStats::getDequeuedMessageClientID, null);
+ }
+
+ @Override
+ public String getDequeuedMessageId() {
+ return getMessageFlowStat(MessageFlowStats::getDequeuedMessageID,
null);
+ }
+
+ @Override
+ public long getDequeuedMessageTimestamp() {
+ return
getMessageFlowStat(MessageFlowStats::getDequeuedMessageTimestamp, 0L);
+ }
+
+ private <T> T getMessageFlowStat(Function<MessageFlowStats,
UnsampledStatistic<T>> f, T defVal) {
+ final var stats =
destination.getDestinationStatistics().getMessageFlowStats();
+ return stats != null ? f.apply(stats).getValue() : defVal;
+ }
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index 68861e89f4..431424f604 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -493,4 +493,37 @@ public interface DestinationViewMBean {
@MBeanInfo("Number of messages acknowledged from the destination via
network connection")
long getNetworkDequeues();
+
+ @MBeanInfo("Query Advanced Message Statistics flag")
+ boolean isAdvancedMessageStatisticsEnabled();
+
+ @MBeanInfo("Toggle Advanced Message Statistics flag")
+ void setAdvancedMessageStatisticsEnabled(boolean
advancedMessageStatisticsEnabled);
+
+ @MBeanInfo("Broker in time (ms) of last enqueued message to the
destination")
+ long getEnqueuedMessageBrokerInTime();
+
+ @MBeanInfo("ClientID of last enqueued message to the destination")
+ String getEnqueuedMessageClientId();
+
+ @MBeanInfo("MessageID of last enqueued message to the destination")
+ String getEnqueuedMessageId();
+
+ @MBeanInfo("Message timestamp in (ms) of last enqueued message to the
destination")
+ long getEnqueuedMessageTimestamp();
+
+ @MBeanInfo("Broker in time (ms) of last dequeued message to the
destination")
+ long getDequeuedMessageBrokerInTime();
+
+ @MBeanInfo("Broker out time (ms) of last dequeued message to the
destination")
+ long getDequeuedMessageBrokerOutTime();
+
+ @MBeanInfo("ClientID of last dequeued message to the destination")
+ String getDequeuedMessageClientId();
+
+ @MBeanInfo("MessageID of last dequeued message to the destination")
+ String getDequeuedMessageId();
+
+ @MBeanInfo("Message timestamp in (ms) of last dequeued message to the
destination")
+ long getDequeuedMessageTimestamp();
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index e3b72e1d62..7be5f52815 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -827,7 +827,7 @@ public abstract class BaseDestination implements
Destination {
@Override
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
- && destinationStatistics.messages.getCount() == 0 &&
getInactiveTimeoutBeforeGC() > 0l) {
+ && destinationStatistics.getMessages().getCount() == 0 &&
getInactiveTimeoutBeforeGC() > 0l) {
this.lastActiveTime = timeStamp;
}
}
@@ -836,7 +836,7 @@ public abstract class BaseDestination implements
Destination {
public boolean canGC() {
boolean result = false;
final long currentLastActiveTime = this.lastActiveTime;
- if (isGcIfInactive() && currentLastActiveTime != 0l &&
destinationStatistics.messages.getCount() == 0L ) {
+ if (isGcIfInactive() && currentLastActiveTime != 0l &&
destinationStatistics.getMessages().getCount() == 0L ) {
if ((System.currentTimeMillis() - currentLastActiveTime) >=
getInactiveTimeoutBeforeGC()) {
result = true;
}
@@ -880,6 +880,16 @@ public abstract class BaseDestination implements
Destination {
this.advancedNetworkStatisticsEnabled =
advancedNetworkStatisticsEnabled;
}
+ @Override
+ public boolean isAdvancedMessageStatisticsEnabled() {
+ return this.destinationStatistics.isAdvancedMessageStatisticsEnabled();
+ }
+
+ @Override
+ public void setAdvancedMessageStatisticsEnabled(boolean
advancedMessageStatisticsEnabled) {
+
this.destinationStatistics.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
+ }
+
@Override
public abstract List<Subscription> getConsumers();
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 45e3de7b3c..22ba14894b 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -259,9 +259,12 @@ public interface Destination extends Service, Task,
Message.MessageDestination {
void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
- // [AMQ-9437]
boolean isAdvancedNetworkStatisticsEnabled();
void setAdvancedNetworkStatisticsEnabled(boolean
advancedNetworkStatisticsEnabled);
+ boolean isAdvancedMessageStatisticsEnabled();
+
+ void setAdvancedMessageStatisticsEnabled(boolean
advancedMessageStatisticsEnabled);
+
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 85ef367a77..1ab96560ac 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -419,6 +419,16 @@ public class DestinationFilter implements Destination {
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}
+ @Override
+ public boolean isAdvancedMessageStatisticsEnabled() {
+ return next.isAdvancedMessageStatisticsEnabled();
+ }
+
+ @Override
+ public void setAdvancedMessageStatisticsEnabled(boolean
advancedMessageStatisticsEnabled) {
+
next.setAdvancedMessageStatisticsEnabled(advancedMessageStatisticsEnabled);
+ }
+
public void deleteSubscription(ConnectionContext context, SubscriptionKey
key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index dc6b17dfaf..86833b86f8 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -14,41 +14,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.broker.region;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.management.*;
/**
- * The J2EE Statistics for the a Destination.
- *
- *
+ * The Statistics for a Destination.
*/
public class DestinationStatistics extends StatsImpl {
- protected CountStatisticImpl enqueues;
- protected CountStatisticImpl dequeues;
- protected CountStatisticImpl forwards;
- protected CountStatisticImpl consumers;
- protected CountStatisticImpl producers;
- protected CountStatisticImpl messages;
- protected PollCountStatisticImpl messagesCached;
- protected CountStatisticImpl dispatched;
- protected CountStatisticImpl duplicateFromStore;
- protected CountStatisticImpl inflight;
- protected CountStatisticImpl expired;
- protected TimeStatisticImpl processTime;
- protected CountStatisticImpl blockedSends;
- protected TimeStatisticImpl blockedTime;
- protected SizeStatisticImpl messageSize;
- protected CountStatisticImpl maxUncommittedExceededCount;
-
- // [AMQ-9437] Advanced Statistics are optionally enabled
- protected CountStatisticImpl networkEnqueues;
- protected CountStatisticImpl networkDequeues;
+ private final CountStatisticImpl enqueues;
+ private final CountStatisticImpl dequeues;
+ private final CountStatisticImpl forwards;
+ private final CountStatisticImpl consumers;
+ private final CountStatisticImpl producers;
+ private final CountStatisticImpl messages;
+ private final PollCountStatisticImpl messagesCached;
+ private final CountStatisticImpl dispatched;
+ private final CountStatisticImpl duplicateFromStore;
+ private final CountStatisticImpl inflight;
+ private final CountStatisticImpl expired;
+ private final TimeStatisticImpl processTime;
+ private final CountStatisticImpl blockedSends;
+ private final TimeStatisticImpl blockedTime;
+ private final SizeStatisticImpl messageSize;
+ private final CountStatisticImpl maxUncommittedExceededCount;
+
+ // [AMQ-9437] Advanced Network Statistics
+ private final CountStatisticImpl networkEnqueues;
+ private final CountStatisticImpl networkDequeues;
+
+ // [AMQ-8463] Advanced Message Statistics are disabled by default
+ private final AtomicReference<MessageFlowStatsImpl> messageFlowStats = new
AtomicReference<>();
public DestinationStatistics() {
@@ -75,25 +77,6 @@ public class DestinationStatistics extends StatsImpl {
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The
number of messages that have been sent to the destination via network
connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The
number of messages that have been acknowledged from the destination via network
connection");
-
- addStatistic("enqueues", enqueues);
- addStatistic("dispatched", dispatched);
- addStatistic("dequeues", dequeues);
- addStatistic("duplicateFromStore", duplicateFromStore);
- addStatistic("inflight", inflight);
- addStatistic("expired", expired);
- addStatistic("consumers", consumers);
- addStatistic("producers", producers);
- addStatistic("messages", messages);
- addStatistic("messagesCached", messagesCached);
- addStatistic("processTime", processTime);
- addStatistic("blockedSends",blockedSends);
- addStatistic("blockedTime",blockedTime);
- addStatistic("messageSize",messageSize);
- addStatistic("maxUncommittedExceededCount",
maxUncommittedExceededCount);
-
- addStatistic("networkEnqueues", networkEnqueues);
- addStatistic("networkDequeues", networkDequeues);
}
public CountStatisticImpl getEnqueues() {
@@ -132,10 +115,6 @@ public class DestinationStatistics extends StatsImpl {
return messages;
}
- public void setMessagesCached(PollCountStatisticImpl messagesCached) {
- this.messagesCached = messagesCached;
- }
-
public CountStatisticImpl getDispatched() {
return dispatched;
}
@@ -170,6 +149,10 @@ public class DestinationStatistics extends StatsImpl {
return networkDequeues;
}
+ public MessageFlowStats getMessageFlowStats() {
+ return messageFlowStats.get();
+ }
+
public void reset() {
if (this.isDoReset()) {
super.reset();
@@ -186,6 +169,8 @@ public class DestinationStatistics extends StatsImpl {
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
+ Optional.ofNullable(messageFlowStats.get())
+ .ifPresent(MessageFlowStatsImpl::reset);
}
}
@@ -208,9 +193,13 @@ public class DestinationStatistics extends StatsImpl {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);
- // [AMQ-9437] Advanced Statistics
+ // [AMQ-9437] Advanced Network Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);
+
+ // [AMQ-9437] Advanced Message Statistics
+ Optional.ofNullable(messageFlowStats.get())
+ .ifPresent(stats -> stats.setEnabled(enabled));
}
public void setParent(DestinationStatistics parent) {
@@ -233,6 +222,7 @@ public class DestinationStatistics extends StatsImpl {
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
+ // [AMQ-9437] Advanced Message Statistics does not have a parent.
} else {
enqueues.setParent(null);
dispatched.setParent(null);
@@ -252,7 +242,25 @@ public class DestinationStatistics extends StatsImpl {
maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
+ // [AMQ-9437] Advanced Message Statistics does not have a parent.
}
}
+ // This is the only method that can mutate the messageFlowStats state
+ public synchronized void setAdvancedMessageStatisticsEnabled(boolean
enabled) {
+ if(!enabled) {
+ this.messageFlowStats.set(null);
+ return;
+ }
+
+ if(this.messageFlowStats.get() == null) {
+ MessageFlowStatsImpl tmpMessageFlowStatsImpl = new
MessageFlowStatsImpl();
+ tmpMessageFlowStatsImpl.setEnabled(true);
+ this.messageFlowStats.set(tmpMessageFlowStatsImpl);
+ }
+ }
+
+ public boolean isAdvancedMessageStatisticsEnabled() {
+ return this.messageFlowStats.get() != null;
+ }
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 1236342cb8..13106cd0f3 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -42,6 +42,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
@@ -374,6 +375,11 @@ public class DurableTopicSubscription extends
PrefetchSubscription implements Us
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled()
&& getContext() != null && getContext().isNetworkConnection()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}
+
+ final MessageFlowStats tmpMessageFlowStats =
((Destination)node.getRegionDestination()).getDestinationStatistics().getMessageFlowStats();
+ if(tmpMessageFlowStats != null) {
+ tmpMessageFlowStats.dequeueStats(context.getClientId(),
node.getMessageId().toString(), node.getMessage().getTimestamp(),
node.getMessage().getBrokerInTime(), node.getMessage().getBrokerOutTime());
+ }
}
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 1d443f6c81..c87b37c1b6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1913,11 +1913,17 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
private void dropMessage(ConnectionContext context, QueueMessageReference
reference) {
//use dropIfLive so we only process the statistics at most one time
if (reference.dropIfLive()) {
- getDestinationStatistics().getDequeues().increment();
- getDestinationStatistics().getMessages().decrement();
+ destinationStatistics.getDequeues().increment();
+ destinationStatistics.getMessages().decrement();
+
+ final var tmpMessageFlowStats =
destinationStatistics.getMessageFlowStats();
+ if(tmpMessageFlowStats != null) {
+ Message tmpMessage = reference.getMessage();
+ tmpMessageFlowStats.dequeueStats(context.getClientId(),
tmpMessage.getMessageId().toString(), tmpMessage.getTimestamp(),
tmpMessage.getBrokerInTime(), tmpMessage.getBrokerOutTime());
+ }
if(isAdvancedNetworkStatisticsEnabled() && context.getConnection()
!= null && context.getConnection().isNetworkConnection()) {
- getDestinationStatistics().getNetworkDequeues().increment();
+ destinationStatistics.getNetworkDequeues().increment();
}
pagedInMessagesLock.writeLock().lock();
@@ -1971,10 +1977,16 @@ public class Queue extends BaseDestination implements
Task, UsageListener, Index
final void messageSent(final ConnectionContext context, final Message msg)
throws Exception {
pendingSends.decrementAndGet();
+
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());
+ final var tmpMessageFlowStats =
destinationStatistics.getMessageFlowStats();
+ if(tmpMessageFlowStats != null) {
+ tmpMessageFlowStats.enqueueStats(context.getClientId(),
msg.getMessageId().toString(), msg.getTimestamp(), msg.getBrokerInTime());
+ }
+
if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() !=
null && context.getConnection().isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 8c3c8bb36d..92e50c04bf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -51,6 +51,7 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.NoLocalSubscriptionAware;
import org.apache.activemq.store.PersistenceAdapter;
@@ -779,6 +780,11 @@ public class Topic extends BaseDestination implements Task
{
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
+ final var tmpMessageFlowStats =
destinationStatistics.getMessageFlowStats();
+ if(tmpMessageFlowStats != null) {
+ tmpMessageFlowStats.enqueueStats(context.getClientId(),
message.getMessageId().toString(), message.getTimestamp(),
message.getBrokerInTime());
+ }
+
if(isAdvancedNetworkStatisticsEnabled() && context != null &&
context.isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 613c4a0f69..73a5113765 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -25,6 +25,7 @@ import
org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.*;
+import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
@@ -453,6 +454,11 @@ public class TopicSubscription extends
AbstractSubscription {
destination.getDestinationStatistics().getNetworkDequeues().add(count);
}
}
+
+ final var tmpMessageFlowStats =
destination.getDestinationStatistics().getMessageFlowStats();
+ if(tmpMessageFlowStats != null) {
+ tmpMessageFlowStats.dequeueStats(context.getClientId(),
ack.getLastMessageId().toString());
+ }
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index f5ab784179..8ae052574b 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -107,6 +107,8 @@ public class PolicyEntry extends DestinationMapEntry {
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
+ private boolean advancedMessageStatisticsEnabled = false; // [AMQ-8463]
+
/*
* percentage of in-flight messages above which optimize message store is
disabled
*/
@@ -309,6 +311,9 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
}
+ if (isUpdate("advancedMessageStatisticsEnabled", includedProperties)) {
+
destination.setAdvancedMessageStatisticsEnabled(isAdvancedMessageStatisticsEnabled());
+ }
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -1187,4 +1192,12 @@ public class PolicyEntry extends DestinationMapEntry {
public void setAdvancedNetworkStatisticsEnabled(boolean
advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled =
advancedNetworkStatisticsEnabled;
}
+
+ public boolean isAdvancedMessageStatisticsEnabled() {
+ return this.advancedMessageStatisticsEnabled;
+ }
+
+ public void setAdvancedMessageStatisticsEnabled(boolean
advancedMessageStatisticsEnabled) {
+ this.advancedMessageStatisticsEnabled =
advancedMessageStatisticsEnabled;
+ }
}
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
index 8be581cc61..b04596585d 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageStoreSubscriptionStatistics.java
@@ -92,7 +92,7 @@ public class MessageStoreSubscriptionStatistics extends
AbstractMessageStoreStat
private class SubscriptionStatistics extends
AbstractMessageStoreStatistics {
public SubscriptionStatistics() {
- this(MessageStoreSubscriptionStatistics.this.enabled);
+ this(MessageStoreSubscriptionStatistics.this.isEnabled());
}
/**
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
index 4f77310011..f1aef71db2 100644
---
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
+++
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionPoolStatsImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.management;
+import java.util.Set;
+
/**
* Statistics for a JCA connection pool
*
@@ -39,9 +41,7 @@ public class JCAConnectionPoolStatsImpl extends
JCAConnectionStatsImpl {
this.waitingThreadCount = waitingThreadCount;
// lets add named stats
- addStatistic("freePoolSize", freePoolSize);
- addStatistic("poolSize", poolSize);
- addStatistic("waitingThreadCount", waitingThreadCount);
+ addStatistics(Set.of(freePoolSize, poolSize, waitingThreadCount));
}
public CountStatisticImpl getCloseCount() {
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
index 1ffd7e6eca..fc9a806d7d 100644
---
a/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
+++
b/activemq-client/src/main/java/org/apache/activemq/management/JCAConnectionStatsImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.management;
+import java.util.Set;
/**
* Statistics for a JCA connection
@@ -35,8 +36,7 @@ public class JCAConnectionStatsImpl extends StatsImpl {
this.useTime = useTime;
// lets add named stats
- addStatistic("waitTime", waitTime);
- addStatistic("useTime", useTime);
+ addStatistics(Set.of(waitTime, useTime));
}
public String getConnectionFactory() {
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
index e0aa0c862c..027af27cc8 100644
---
a/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
+++
b/activemq-client/src/main/java/org/apache/activemq/management/JMSEndpointStatsImpl.java
@@ -21,6 +21,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import java.util.Set;
+
import org.apache.activemq.util.IndentPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,11 +79,7 @@ public class JMSEndpointStatsImpl extends StatsImpl {
this.messageRateTime = messageRateTime;
// lets add named stats
- addStatistic("messageCount", messageCount);
- addStatistic("pendingMessageCount", pendingMessageCount);
- addStatistic("expiredMessageCount", expiredMessageCount);
- addStatistic("messageWaitTime", messageWaitTime);
- addStatistic("messageRateTime", messageRateTime);
+ addStatistics(Set.of(messageCount, pendingMessageCount,
expiredMessageCount, messageWaitTime, messageRateTime));
}
public synchronized void reset() {
@@ -128,7 +126,7 @@ public class JMSEndpointStatsImpl extends StatsImpl {
}
public void onMessage() {
- if (enabled) {
+ if (isEnabled()) {
long start = messageCount.getLastSampleTime();
messageCount.increment();
long end = messageCount.getLastSampleTime();
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
index ec8be9ac2b..f7778d73cd 100644
---
a/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
+++
b/activemq-client/src/main/java/org/apache/activemq/management/JMSSessionStatsImpl.java
@@ -17,6 +17,7 @@
package org.apache.activemq.management;
import java.util.List;
+import java.util.Set;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageProducer;
@@ -54,12 +55,7 @@ public class JMSSessionStatsImpl extends StatsImpl {
"Time taken to process a
message (thoughtput rate)");
// lets add named stats
- addStatistic("messageCount", messageCount);
- addStatistic("pendingMessageCount", pendingMessageCount);
- addStatistic("expiredMessageCount", expiredMessageCount);
- addStatistic("messageWaitTime", messageWaitTime);
- addStatistic("durableSubscriptionCount", durableSubscriptionCount);
- addStatistic("messageRateTime", messageRateTime);
+ addStatistics(Set.of(messageCount, pendingMessageCount,
expiredMessageCount, messageWaitTime, durableSubscriptionCount,
messageRateTime));
}
public JMSProducerStatsImpl[] getProducers() {
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java
new file mode 100644
index 0000000000..bbb742b4ec
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStats.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+public interface MessageFlowStats {
+ UnsampledStatistic<Long> getEnqueuedMessageBrokerInTime();
+ UnsampledStatistic<String> getEnqueuedMessageClientID();
+ UnsampledStatistic<String> getEnqueuedMessageID();
+ UnsampledStatistic<Long> getEnqueuedMessageTimestamp();
+ UnsampledStatistic<Long> getDequeuedMessageBrokerInTime();
+ UnsampledStatistic<Long> getDequeuedMessageBrokerOutTime();
+ UnsampledStatistic<String> getDequeuedMessageClientID();
+ UnsampledStatistic<String> getDequeuedMessageID();
+ UnsampledStatistic<Long> getDequeuedMessageTimestamp();
+ void enqueueStats(String clientID, String messageID, long
messageTimestamp, long messageBrokerInTime);
+ void dequeueStats(String clientID, String messageID);
+ void dequeueStats(String clientID, String messageID, long
messageTimestamp, long messageBrokerInTime, long messageBrokerOutTime);
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java
new file mode 100644
index 0000000000..3833f2bd79
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/management/MessageFlowStatsImpl.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+import java.util.Set;
+
+public class MessageFlowStatsImpl extends UnsampledStatsImpl implements
MessageFlowStats, Statistic, Resettable {
+
+ private final UnsampledStatisticImpl<Long> enqueuedMessageBrokerInTime;
+ private final UnsampledStatisticImpl<String> enqueuedMessageClientID;
+ private final UnsampledStatisticImpl<String> enqueuedMessageID;
+ private final UnsampledStatisticImpl<Long> enqueuedMessageTimestamp;
+ private final UnsampledStatisticImpl<Long> dequeuedMessageBrokerInTime;
+ private final UnsampledStatisticImpl<Long> dequeuedMessageBrokerOutTime;
+ private final UnsampledStatisticImpl<String> dequeuedMessageClientID;
+ private final UnsampledStatisticImpl<String> dequeuedMessageID;
+ private final UnsampledStatisticImpl<Long> dequeuedMessageTimestamp;
+
+ public MessageFlowStatsImpl() {
+ super();
+
+ enqueuedMessageBrokerInTime = new
UnsampledStatisticImpl<>("enqueuedMessageBrokerInTime", "ms", "Broker in time
(ms) of last enqueued message to the destination", Long.valueOf(0l));
+ enqueuedMessageClientID = new
UnsampledStatisticImpl<>("enqueuedMessageClientID", "id", "ClientID of last
enqueued message to the destination", null);
+ enqueuedMessageID = new UnsampledStatisticImpl<>("enqueuedMessageID",
"id", "MessageID of last enqueued message to the destination", null);
+ enqueuedMessageTimestamp = new
UnsampledStatisticImpl<>("enqueuedMessageTimestamp", "ms", "Message timestamp
of last enqueued message to the destination", Long.valueOf(0l));
+
+ dequeuedMessageBrokerInTime = new
UnsampledStatisticImpl<>("dequeuedMessageBrokerInTime", "ms", "Broker in time
(ms) of last dequeued message to the destination", Long.valueOf(0l));
+ dequeuedMessageBrokerOutTime = new
UnsampledStatisticImpl<>("dequeuedMessageBrokerOutTime", "ms", "Broker out time
(ms) of last dequeued message to the destination", Long.valueOf(0l));
+ dequeuedMessageClientID = new
UnsampledStatisticImpl<>("dequeuedMessageClientID", "id", "ClientID of last
dequeued message to the destination", null);
+ dequeuedMessageID = new UnsampledStatisticImpl<>("dequeuedMessageID",
"id", "MessageID of last dequeued message to the destination", null);
+ dequeuedMessageTimestamp = new
UnsampledStatisticImpl<>("dequeuedMessageTimestamp", "ms", "Message timestamp
of last dequeued message to the destination", Long.valueOf(0l));
+
+ addStatistics(Set.of(enqueuedMessageBrokerInTime,
enqueuedMessageClientID, enqueuedMessageID, enqueuedMessageTimestamp,
+ dequeuedMessageBrokerInTime, dequeuedMessageBrokerOutTime,
dequeuedMessageClientID, dequeuedMessageID, dequeuedMessageTimestamp));
+ }
+
+ @Override
+ public UnsampledStatistic<Long> getEnqueuedMessageBrokerInTime() {
+ return enqueuedMessageBrokerInTime;
+ }
+
+ @Override
+ public UnsampledStatistic<String> getEnqueuedMessageClientID() {
+ return enqueuedMessageClientID;
+ }
+
+ @Override
+ public UnsampledStatistic<String> getEnqueuedMessageID() {
+ return enqueuedMessageID;
+ }
+
+ @Override
+ public UnsampledStatistic<Long> getEnqueuedMessageTimestamp() {
+ return enqueuedMessageTimestamp;
+ }
+
+ @Override
+ public UnsampledStatistic<Long> getDequeuedMessageBrokerInTime() {
+ return dequeuedMessageBrokerInTime;
+ }
+
+ @Override
+ public UnsampledStatistic<Long> getDequeuedMessageBrokerOutTime() {
+ return dequeuedMessageBrokerOutTime;
+ }
+
+ @Override
+ public UnsampledStatistic<String> getDequeuedMessageClientID() {
+ return dequeuedMessageClientID;
+ }
+
+ @Override
+ public UnsampledStatistic<String> getDequeuedMessageID() {
+ return dequeuedMessageID;
+ }
+
+ @Override
+ public UnsampledStatistic<Long> getDequeuedMessageTimestamp() {
+ return dequeuedMessageTimestamp;
+ }
+
+ @Override
+ public synchronized void enqueueStats(String clientID, String messageID,
long messageTimestamp, long messageBrokerInTime) {
+ enqueuedMessageClientID.setValue(clientID);
+ enqueuedMessageID.setValue(messageID);
+ enqueuedMessageTimestamp.setValue(messageTimestamp);
+ enqueuedMessageBrokerInTime.setValue(messageBrokerInTime);
+ }
+
+ @Override
+ public synchronized void dequeueStats(String clientID, String messageID) {
+ dequeuedMessageClientID.setValue(clientID);
+ dequeuedMessageID.setValue(messageID);
+ }
+
+ @Override
+ public synchronized void dequeueStats(String clientID, String messageID,
long messageTimestamp, long messageBrokerInTime, long messageBrokerOutTime) {
+ dequeuedMessageClientID.setValue(clientID);
+ dequeuedMessageID.setValue(messageID);
+ dequeuedMessageTimestamp.setValue(messageTimestamp);
+ dequeuedMessageBrokerInTime.setValue(messageBrokerInTime);
+ dequeuedMessageBrokerOutTime.setValue(messageBrokerOutTime);
+ }
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
index 1dbcc80c69..9dbf5a019c 100644
---
a/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
+++
b/activemq-client/src/main/java/org/apache/activemq/management/StatisticImpl.java
@@ -15,22 +15,19 @@
* limitations under the License.
*/
package org.apache.activemq.management;
-
/**
- * Base class for a Statistic implementation
- *
- *
+ * A thread-safe class for a Statistic implementation
*/
public class StatisticImpl implements Statistic, Resettable {
- protected boolean enabled;
+ private volatile boolean enabled;
- private String name;
- private String unit;
- private String description;
- private long startTime;
- private long lastSampleTime;
- private boolean doReset = true;
+ private final String name;
+ private final String unit;
+ private final String description;
+ private volatile long startTime;
+ private volatile long lastSampleTime;
+ private volatile boolean doReset = true;
public StatisticImpl(String name, String unit, String description) {
this.name = name;
@@ -40,6 +37,14 @@ public class StatisticImpl implements Statistic, Resettable {
this.lastSampleTime = this.startTime;
}
+ protected StatisticImpl(String name, String unit, String description, long
startTime, long lastSampleTime) {
+ this.name = name;
+ this.unit = unit;
+ this.description = description;
+ this.startTime = startTime;
+ this.lastSampleTime = lastSampleTime;
+ }
+
public synchronized void reset() {
if(isDoReset()) {
this.startTime = System.currentTimeMillis();
@@ -51,7 +56,8 @@ public class StatisticImpl implements Statistic, Resettable {
this.lastSampleTime = System.currentTimeMillis();
}
- public synchronized String toString() {
+ public String toString() {
+ // NOTE: Do not double-lock here as appendFileDescription performs the
lock
StringBuffer buffer = new StringBuffer();
buffer.append(name);
buffer.append("{");
@@ -93,7 +99,7 @@ public class StatisticImpl implements Statistic, Resettable {
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
-
+
/**
* @return the doReset
*/
@@ -108,7 +114,6 @@ public class StatisticImpl implements Statistic, Resettable
{
this.doReset = doReset;
}
-
protected synchronized void appendFieldDescription(StringBuffer buffer) {
buffer.append(" unit: ");
buffer.append(this.unit);
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java
index f3ae60fb07..46d24a685f 100644
---
a/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java
+++
b/activemq-client/src/main/java/org/apache/activemq/management/StatsImpl.java
@@ -16,18 +16,15 @@
*/
package org.apache.activemq.management;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
- * Base class for a Stats implementation
- *
- *
+ * A thread-safe class for a Stats implementation
*/
public class StatsImpl extends StatisticImpl implements Stats, Resettable {
//use a Set instead of a Map - to conserve Space
- private Set<StatisticImpl> set;
+ protected final Set<StatisticImpl> set;
public StatsImpl() {
this(new CopyOnWriteArraySet<StatisticImpl>());
@@ -38,44 +35,35 @@ public class StatsImpl extends StatisticImpl implements
Stats, Resettable {
this.set = set;
}
- public void reset() {
- Statistic[] stats = getStatistics();
- int size = stats.length;
- for (int i = 0; i < size; i++) {
- Statistic stat = stats[i];
- if (stat instanceof Resettable) {
- Resettable r = (Resettable) stat;
- r.reset();
- }
- }
+ public synchronized void reset() {
+ this.set.stream()
+ .filter(Resettable.class::isInstance)
+ .map(Resettable.class::cast)
+ .forEach(resetStat -> resetStat.reset());
}
- public Statistic getStatistic(String name) {
- for (StatisticImpl stat : this.set) {
- if (stat.getName() != null && stat.getName().equals(name)) {
- return stat;
- }
- }
- return null;
+ public synchronized Statistic getStatistic(String name) {
+ return this.set.stream().filter(s ->
s.getName().equals(name)).findFirst().orElse(null);
}
- public String[] getStatisticNames() {
- List<String> names = new ArrayList<String>();
- for (StatisticImpl stat : this.set) {
- names.add(stat.getName());
- }
- String[] answer = new String[names.size()];
- names.toArray(answer);
- return answer;
+ public synchronized String[] getStatisticNames() {
+ return
this.set.stream().map(StatisticImpl::getName).toArray(String[]::new);
}
- public Statistic[] getStatistics() {
- Statistic[] answer = new Statistic[this.set.size()];
- set.toArray(answer);
- return answer;
+ public synchronized Statistic[] getStatistics() {
+ return this.set.toArray(new Statistic[this.set.size()]);
}
- protected void addStatistic(String name, StatisticImpl statistic) {
+ @Deprecated(forRemoval = true, since = "6.2.0")
+ protected synchronized void addStatistic(String name, StatisticImpl
statistic) {
this.set.add(statistic);
}
+
+ protected void addStatistics(Collection<StatisticImpl> statistics) {
+ this.set.addAll(statistics);
+ }
+
+ protected void removeStatistics(Collection<StatisticImpl> statistics) {
+ this.set.removeAll(statistics);
+ }
}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java
new file mode 100644
index 0000000000..1aff3bc14e
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatistic.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.management;
+
+/**
+ * A Statistic without sampleTime or or startTime.
+ */
+public interface UnsampledStatistic<T> extends Statistic, Resettable {
+ public T getValue();
+ public void setValue(T value);
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java
new file mode 100644
index 0000000000..45ca70bacc
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatisticImpl.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+/**
+ * An UnsampledStatistic<T> implementation
+ */
+public class UnsampledStatisticImpl<T> extends StatisticImpl implements
UnsampledStatistic<T> {
+
+ private volatile T value;
+ private final T defaultValue;
+
+ public UnsampledStatisticImpl(String name, String unit, String
description, T defaultValue) {
+ super(name, unit, description, 0l, 0l);
+ this.value = defaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public void reset() {
+ if (isDoReset()) {
+ value = defaultValue;
+ }
+ }
+
+ @Override
+ protected void updateSampleTime() {}
+
+ @Override
+ public long getStartTime() {
+ return 0l;
+ }
+
+ @Override
+ public long getLastSampleTime() {
+ return 0l;
+ }
+
+ @Override
+ public T getValue() {
+ return value;
+ }
+
+ @Override
+ public void setValue(T value) {
+ if (isEnabled()) {
+ this.value = value;
+ }
+ }
+
+ protected void appendFieldDescription(StringBuffer buffer) {
+ buffer.append(" value: ");
+ buffer.append(value);
+ super.appendFieldDescription(buffer);
+ }
+}
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java
new file mode 100644
index 0000000000..10d5facec2
--- /dev/null
+++
b/activemq-client/src/main/java/org/apache/activemq/management/UnsampledStatsImpl.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.management;
+
+public class UnsampledStatsImpl extends StatsImpl {
+
+ public UnsampledStatsImpl() {
+ super();
+ }
+
+ @Override
+ protected void updateSampleTime() {}
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+ @Override
+ public long getLastSampleTime() {
+ return 0;
+ }
+
+ @Override
+ public synchronized void setEnabled(boolean enabled) {
+ set.stream().forEach(stat -> stat.setEnabled(enabled));
+ }
+}
diff --git
a/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java
b/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java
new file mode 100644
index 0000000000..f53207b3a9
--- /dev/null
+++
b/activemq-client/src/test/java/org/apache/activemq/management/UnsampledStatisticsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.management;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class UnsampledStatisticsTest {
+
+ @Test
+ public void testUnsampledStatisticsEnabledTest() {
+ UnsampledStatisticImpl<Long> longStatistic = new
UnsampledStatisticImpl<>("longStat", "long", "A long statistic",
Long.valueOf(0l));
+ longStatistic.setEnabled(true);
+ longStatistic.setValue(Long.MAX_VALUE);
+
+ UnsampledStatisticImpl<String> stringStatistic = new
UnsampledStatisticImpl<>("stringStat", "chars", "A string statistic", null);
+ stringStatistic.setEnabled(true);
+ stringStatistic.setValue("Hello World!");
+
+ assertEquals("A long statistic", longStatistic.getDescription());
+ assertEquals(Long.valueOf(0l),
Long.valueOf(longStatistic.getLastSampleTime()));
+ assertEquals("longStat", longStatistic.getName());
+ assertEquals(Long.valueOf(0l),
Long.valueOf(longStatistic.getStartTime()));
+ assertEquals("long", longStatistic.getUnit());
+ assertEquals(Long.valueOf(Long.MAX_VALUE), longStatistic.getValue());
+ assertTrue(longStatistic.toString().contains("value: " +
Long.MAX_VALUE));
+ longStatistic.reset();
+ assertEquals(Long.valueOf(0l), longStatistic.getValue());
+ assertTrue(longStatistic.toString().contains("value: 0"));
+
+ longStatistic.reset();
+ longStatistic.setEnabled(false);
+ assertFalse(longStatistic.isEnabled());
+ longStatistic.setValue(12345678l);
+ assertEquals(Long.valueOf(0l), longStatistic.getValue());
+
+ assertEquals("A string statistic", stringStatistic.getDescription());
+ assertEquals(Long.valueOf(0l),
Long.valueOf(stringStatistic.getLastSampleTime()));
+ assertEquals("stringStat", stringStatistic.getName());
+ assertEquals(Long.valueOf(0l),
Long.valueOf(stringStatistic.getStartTime()));
+ assertEquals("chars", stringStatistic.getUnit());
+ assertEquals("Hello World!", stringStatistic.getValue());
+ assertTrue(stringStatistic.toString().contains("value: Hello World!"));
+ stringStatistic.reset();
+ assertNull(stringStatistic.getValue());
+ assertTrue(stringStatistic.toString().contains("value: null"));
+
+ stringStatistic.reset();
+ stringStatistic.setEnabled(false);
+ assertFalse(stringStatistic.isEnabled());
+ stringStatistic.setValue("This should be ignored");
+ assertNull(stringStatistic.getValue());
+ }
+
+}
diff --git
a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
index 4f077018ac..e7abddd8da 100644
---
a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
+++
b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
@@ -57,6 +57,18 @@ public class PolicyEntryTest extends
RuntimeConfigTestSupport {
verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true);
}
+ @Test
+ public void testModAdvancedMessageStatistics() throws Exception {
+ final String brokerConfig = configurationSeed + "-policy-ml-broker";
+ applyNewConfig(brokerConfig, configurationSeed +
"-policy-advancedMessageStatistics");
+ startBroker(brokerConfig);
+ assertTrue("broker alive", brokerService.isStarted());
+
+ verifyBooleanField("AMQ.8463", "advancedMessageStatisticsEnabled",
false);
+ applyNewConfig(brokerConfig, configurationSeed +
"-policy-advancedMessageStatistics-mod", SLEEP);
+ verifyBooleanField("AMQ.8463", "advancedMessageStatisticsEnabled",
true);
+ }
+
@Test
public void testModAdvancedNetworkStatistics() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
@@ -133,6 +145,9 @@ public class PolicyEntryTest extends
RuntimeConfigTestSupport {
session.createConsumer(session.createQueue(dest));
switch(fieldName) {
+ case "advancedMessageStatisticsEnabled":
+ assertEquals(value,
brokerService.getRegionBroker().getDestinationMap().get(new
ActiveMQQueue(dest)).isAdvancedMessageStatisticsEnabled());
+ break;
case "advancedNetworkStatisticsEnabled":
assertEquals(value,
brokerService.getRegionBroker().getDestinationMap().get(new
ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled());
break;
diff --git
a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml
new file mode 100644
index 0000000000..4f56b3185d
--- /dev/null
+++
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics-mod.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core" start="false"
persistent="false">
+ <plugins>
+ <runtimeConfigurationPlugin checkPeriod="1000" />
+ </plugins>
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue="AMQ.8463"
advancedMessageStatisticsEnabled="true"/>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ </broker>
+</beans>
diff --git
a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml
new file mode 100644
index 0000000000..8b9166c778
--- /dev/null
+++
b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedMessageStatistics.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core" start="false"
persistent="false">
+ <plugins>
+ <runtimeConfigurationPlugin checkPeriod="1000" />
+ </plugins>
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue="AMQ.8463"/>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ </broker>
+</beans>
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
index e5c755dc9d..4922cd2ba1 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
@@ -17,20 +17,29 @@
package org.apache.activemq.network;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-
+import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.management.MessageFlowStats;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.Test;
@@ -44,8 +53,9 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
@Parameterized.Parameters(name="includedDestination={0},
excludedDestination={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { new ActiveMQTopic("include.test.bar"), new
ActiveMQTopic("exclude.test.bar")},
- { new ActiveMQQueue("include.test.foo"), new
ActiveMQQueue("exclude.test.foo")}});
+ { new ActiveMQTopic("include.test.durable"), new
ActiveMQTopic("exclude.test.durable"), true},
+ { new ActiveMQTopic("include.test.nondurable"), new
ActiveMQTopic("exclude.test.nondurable"), false},
+ { new ActiveMQQueue("include.test.foo"), new
ActiveMQQueue("exclude.test.foo"), false}});
}
protected static final int MESSAGE_COUNT = 10;
@@ -55,15 +65,38 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
private final ActiveMQDestination includedDestination;
private final ActiveMQDestination excludedDestination;
+ private final boolean durable;
- public NetworkAdvancedStatisticsTest(ActiveMQDestination
includedDestionation, ActiveMQDestination excludedDestination) {
+ public NetworkAdvancedStatisticsTest(ActiveMQDestination
includedDestionation, ActiveMQDestination excludedDestination, boolean durable)
{
this.includedDestination = includedDestionation;
this.excludedDestination = excludedDestination;
+ this.durable = durable;
}
@Override
protected void doSetUp(boolean deleteAllMessages) throws Exception {
- super.doSetUp(deleteAllMessages);
+ remoteBroker = createRemoteBroker();
+ remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ remoteBroker.start();
+ remoteBroker.waitUntilStarted();
+ localBroker = createLocalBroker();
+ localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ localBroker.start();
+ localBroker.waitUntilStarted();
+ URI localURI = localBroker.getVmConnectorURI();
+ ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory(localURI);
+ fac.setAlwaysSyncSend(true);
+ fac.setDispatchAsync(false);
+ localConnection = fac.createConnection();
+ localConnection.setClientID("localClientId");
+
+ URI remoteURI = remoteBroker.getVmConnectorURI();
+ fac = new ActiveMQConnectionFactory(remoteURI);
+ remoteConnection = fac.createConnection();
+ remoteConnection.setClientID("remoteClientId");
+
+ localSession = localConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ remoteSession = remoteConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
}
@Override
@@ -81,32 +114,55 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
public void testNetworkAdvancedStatistics() throws Exception {
// create a remote durable consumer to create demand
+ final Map<String, Message> receivedMessages = new
ConcurrentHashMap<>();
+ final Collection<Exception> receivedExceptions = new
ConcurrentLinkedQueue<>();
+
MessageConsumer remoteConsumer;
- if(includedDestination.isTopic()) {
+ if(includedDestination.isTopic() && durable) {
remoteConsumer =
remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination),
consumerName);
} else {
remoteConsumer = remoteSession.createConsumer(includedDestination);
- remoteConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- }
- });
}
- Thread.sleep(1000);
+ remoteConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ receivedMessages.put(message.getJMSMessageID(), message);
+ } catch (JMSException e) {
+ receivedExceptions.add(e);
+ }
+ }
+ });
+
+ localConnection.start();
+ remoteConnection.start();
MessageProducer producer =
localSession.createProducer(includedDestination);
+ String lastIncludedSentMessageID = null;
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
+ lastIncludedSentMessageID = test.getJMSMessageID();
}
- Thread.sleep(1000);
MessageProducer producerExcluded =
localSession.createProducer(excludedDestination);
+ String lastExcludedSentMessageID = null;
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producerExcluded.send(test);
+ lastExcludedSentMessageID = test.getJMSMessageID();
}
- Thread.sleep(1000);
+
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ // The number of message that remain is due to the exclude
queue
+ return receivedMessages.size() == MESSAGE_COUNT;
+ }
+ }, 10000, 500));
+
+ assertTrue(receivedExceptions.isEmpty());
+ assertEquals(Integer.valueOf(MESSAGE_COUNT),
Integer.valueOf(receivedMessages.size()));
//Make sure stats are correct for local -> remote
assertEquals(MESSAGE_COUNT,
localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
@@ -119,7 +175,36 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
assertEquals(MESSAGE_COUNT,
remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0,
remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
- // Make sure stats do not increment for local-only
+ // Advanced Message status - enqueue
+ MessageFlowStats localBrokerIncludedMessageFlowStats =
localBroker.getDestination(includedDestination).getDestinationStatistics().getMessageFlowStats();
+ MessageFlowStats localBrokerExcludedMessageFlowStats =
localBroker.getDestination(excludedDestination).getDestinationStatistics().getMessageFlowStats();
+ MessageFlowStats remoteBrokerExcludedMessageFlowStats =
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getMessageFlowStats();
+
+ assertEquals(lastIncludedSentMessageID,
localBrokerIncludedMessageFlowStats.getEnqueuedMessageID().getValue());
+
assertNotNull(localBrokerIncludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue());
+
assertTrue(localBrokerIncludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue()
> 0l);
+
assertNotNull(localBrokerIncludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue());
+
assertTrue(localBrokerIncludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue()
> 0l);
+ assertEquals("localClientId",
localBrokerIncludedMessageFlowStats.getEnqueuedMessageClientID().getValue());
+
+ // Advanced Message status - dequeue
+ assertEquals(lastIncludedSentMessageID,
localBrokerIncludedMessageFlowStats.getDequeuedMessageID().getValue());
+
assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue());
+
assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue());
+
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageClientID().getValue().startsWith("networkConnector"));
+
assertNotNull(localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue());
+
+ if(includedDestination.isTopic() && !durable) {
+ assertEquals(Long.valueOf(0l),
localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue());
+ assertEquals(Long.valueOf(0l),
localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue());
+ assertEquals(Long.valueOf(0l),
localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue());
+ } else {
+
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue()
> 0l);
+
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue()
> 0l);
+
assertTrue(localBrokerIncludedMessageFlowStats.getDequeuedMessageTimestamp().getValue()
> 0l);
+ }
+
+ // Make sure stats do not increment for local-only excluded
destinations
assertEquals(MESSAGE_COUNT,
localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(0,
localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(0,
localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
@@ -129,6 +214,21 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
assertEquals(0,
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(0,
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0,
remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+ assertEquals(lastExcludedSentMessageID,
localBrokerExcludedMessageFlowStats.getEnqueuedMessageID().getValue());
+
assertNull(localBrokerExcludedMessageFlowStats.getDequeuedMessageID().getValue());
+
+ // Advanced Message status - enqueue
+
assertNull(remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageID().getValue());
+ assertEquals(Long.valueOf(0l),
remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageBrokerInTime().getValue());
+ assertEquals(Long.valueOf(0l),
remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageTimestamp().getValue());
+
assertNull(remoteBrokerExcludedMessageFlowStats.getEnqueuedMessageClientID().getValue());
+
+ // Advanced Message status - dequeue
+ assertNull(lastIncludedSentMessageID,
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageID().getValue());
+ assertEquals(Long.valueOf(0l),
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageBrokerInTime().getValue());
+ assertEquals(Long.valueOf(0l),
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageBrokerOutTime().getValue());
+ assertEquals(Long.valueOf(0l),
remoteBrokerExcludedMessageFlowStats.getDequeuedMessageTimestamp().getValue());
+
assertNull(remoteBrokerExcludedMessageFlowStats.getDequeuedMessageClientID().getValue());
if(includedDestination.isTopic()) {
assertTrue(Wait.waitFor(new Condition() {
@@ -164,4 +264,5 @@ public class NetworkAdvancedStatisticsTest extends
BaseNetworkTest {
}
}));
}
+
}
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
index b543169f6e..f17fa9bcc5 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
@@ -28,11 +28,11 @@
<destinationPolicy>
<policyMap>
<policyEntries>
- <policyEntry queue="exclude.>"
advancedNetworkStatisticsEnabled="true"/>
- <policyEntry queue="include.>"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry queue="exclude.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry queue="include.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="ActiveMQ.Advisory.>" />
- <policyEntry topic="exclude.>"
advancedNetworkStatisticsEnabled="true"/>
- <policyEntry topic="include.>"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="exclude.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="include.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
@@ -43,17 +43,19 @@
conduitSubscriptions = "true"
decreaseNetworkConsumerPriority = "false"
name="networkConnector">
- <dynamicallyIncludedDestinations>
- <queue physicalName="include.test.foo"/>
- <topic physicalName="include.test.bar"/>
- </dynamicallyIncludedDestinations>
- <excludedDestinations>
- <queue physicalName="exclude.test.foo"/>
- <topic physicalName="exclude.test.bar"/>
- </excludedDestinations>
+ <dynamicallyIncludedDestinations>
+ <queue physicalName="include.test.foo"/>
+ <topic physicalName="include.test.durable"/>
+ <topic physicalName="include.test.nondurable"/>
+ </dynamicallyIncludedDestinations>
+ <excludedDestinations>
+ <queue physicalName="exclude.test.foo"/>
+ <topic physicalName="exclude.test.durable"/>
+ <topic physicalName="exclude.test.nondurable"/>
+ </excludedDestinations>
</networkConnector>
</networkConnectors>
-
+
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
index a9cf93f73a..d2b3c0f33d 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
@@ -28,11 +28,11 @@
<destinationPolicy>
<policyMap>
<policyEntries>
- <policyEntry queue="exclude.>"
advancedNetworkStatisticsEnabled="true"/>
- <policyEntry queue="include.>"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry queue="exclude.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry queue="include.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="ActiveMQ.Advisory.>" />
- <policyEntry topic="exclude.>"
advancedNetworkStatisticsEnabled="true"/>
- <policyEntry topic="include.>"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="exclude.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="include.>"
advancedMessageStatisticsEnabled="true"
advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact