This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new 0b88dab AMQ-7129 - fix durable message size statistics with
individual ack
0b88dab is described below
commit 0b88dabb40c7c29a1b86d9413a701cf31cc2ceda
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Fri Jan 11 09:56:03 2019 -0500
AMQ-7129 - fix durable message size statistics with individual ack
Make sure that the pending message size for a durable sub only includes
messages part of the ack range
(cherry picked from commit fa2daa25e9acd3f37bb1ee0d37717d2383e67a85)
---
.../apache/activemq/store/kahadb/MessageDatabase.java | 13 ++++++++++---
.../store/kahadb/KahaDBDurableMessageRecoveryTest.java | 17 +++++++++++++++++
2 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8bb902d..8030bc4 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -3011,16 +3011,23 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
SequenceSet messageSequences = sd.ackPositions.get(tx,
subscriptionKey);
if (messageSequences != null) {
- Sequence head = messageSequences.getHead();
- if (head != null) {
+ if (!messageSequences.isEmpty()) {
+ final Sequence head = messageSequences.getHead();
+
//get an iterator over the order index starting at the
first unacked message
//and go over each message to add up the size
Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx,
new MessageOrderCursor(head.getFirst()));
+ final boolean contiguousRange = messageSequences.size() ==
1;
while (iterator.hasNext()) {
Entry<Long, MessageKeys> entry = iterator.next();
- locationSize += entry.getValue().location.getSize();
+ //Verify sequence contains the key
+ //if contiguous we just add all starting with the
first but if not
+ //we need to check if the id is part of the range -
could happen if individual ack mode was used
+ if (contiguousRange ||
messageSequences.contains(entry.getKey())) {
+ locationSize +=
entry.getValue().location.getSize();
+ }
}
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
index a44e8c0..519648e 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -209,6 +209,12 @@ public class KahaDBDurableMessageRecoveryTest {
// Verify there are 8 messages left still and restart broker
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic,
"clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic,
"clientId1", "sub2"), 3000, 500));
+
+ //Verify the pending size is less for sub1
+ assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+ assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+ assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") <
getPendingMessageSize(topic, "clientId1", "sub2"));
+
subscriber1.close();
subscriber2.close();
restartBroker(recoverIndex);
@@ -217,6 +223,11 @@ public class KahaDBDurableMessageRecoveryTest {
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic,
"clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic,
"clientId1", "sub2"), 3000, 500));
+ //Verify the pending size is less for sub1
+ assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+ assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+ assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") <
getPendingMessageSize(topic, "clientId1", "sub2"));
+
// Recreate subscriber and try and receive the other 8 messages
session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
subscriber1 = session.createDurableSubscriber(topic, "sub1");
@@ -347,4 +358,10 @@ public class KahaDBDurableMessageRecoveryTest {
final TopicMessageStore store = (TopicMessageStore)
brokerTopic.getMessageStore();
return store.getMessageCount(clientId, subId);
}
+
+ protected long getPendingMessageSize(ActiveMQTopic topic, String clientId,
String subId) throws Exception {
+ final Topic brokerTopic = (Topic) broker.getDestination(topic);
+ final TopicMessageStore store = (TopicMessageStore)
brokerTopic.getMessageStore();
+ return store.getMessageSize(clientId, subId);
+ }
}