This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new 64d8538 AMQ-7136 - Improve recovery of durable subscription metrics
in KahaDB
64d8538 is described below
commit 64d8538b49bbd1dec328b9bdb6ce40e735d57af7
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Tue Jan 15 14:10:11 2019 -0500
AMQ-7136 - Improve recovery of durable subscription metrics in KahaDB
Updated metrics recovery to only have to iterate over the order index 1
time to recovery the pending metrics for the subscriptions instead of
making a pass over the index once per subscription
(cherry picked from commit c3714457f11633231a6e925f09028686db04e423)
---
.../apache/activemq/store/kahadb/KahaDBStore.java | 20 ++++++--
.../activemq/store/kahadb/MessageDatabase.java | 56 +++++++++++++++++++++-
.../kahadb/KahaDBDurableMessageRecoveryTest.java | 30 +++++++-----
3 files changed, 87 insertions(+), 19 deletions(-)
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index cbbf9b6..9fd0351 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.ConnectionContext;
@@ -960,7 +961,6 @@ public class KahaDBStore extends MessageDatabase implements
PersistenceAdapter,
protected void recoverMessageStoreSubMetrics() throws IOException {
if (isEnableSubscriptionStatistics()) {
-
final MessageStoreSubscriptionStatistics statistics =
getMessageStoreSubStatistics();
indexLock.writeLock().lock();
try {
@@ -968,19 +968,29 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
@Override
public void execute(Transaction tx) throws IOException
{
StoredDestination sd = getStoredDestination(dest,
tx);
+
+ List<String> subscriptionKeys = new ArrayList<>();
for (Iterator<Entry<String,
KahaSubscriptionCommand>> iterator = sd.subscriptions
.iterator(tx); iterator.hasNext();) {
Entry<String, KahaSubscriptionCommand> entry =
iterator.next();
- String subscriptionKey = entry.getKey();
- LastAck cursorPos = getLastAck(tx, sd,
subscriptionKey);
+ final String subscriptionKey = entry.getKey();
+ final LastAck cursorPos = getLastAck(tx, sd,
subscriptionKey);
if (cursorPos != null) {
- long size = getStoredMessageSize(tx, sd,
subscriptionKey);
+ //add the subscriptions to a list for
recovering pending sizes below
+ subscriptionKeys.add(subscriptionKey);
+ //recover just the count here as that is
fast
statistics.getMessageCount(subscriptionKey)
.setCount(getStoredMessageCount(tx, sd, subscriptionKey));
-
statistics.getMessageSize(subscriptionKey).addSize(size > 0 ? size : 0);
}
}
+
+ //Recover the message sizes for each subscription
by iterating only 1 time over the order index
+ //to speed up recovery
+ final Map<String, AtomicLong>
subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys);
+ subPendingMessageSizes.forEach((k,v) -> {
+ statistics.getMessageSize(k).addSize(v.get() >
0 ? v.get() : 0);
+ });
}
});
} finally {
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 83650e8..83d3fff 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -46,7 +46,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
-import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -3003,6 +3002,61 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
return 0;
}
+ /**
+ * Recovers durable subscription pending message size with only 1 pass
over the order index on recovery
+ * instead of iterating over the index once per subscription
+ *
+ * @param tx
+ * @param sd
+ * @param subscriptionKeys
+ * @return
+ * @throws IOException
+ */
+ protected Map<String, AtomicLong> getStoredMessageSize(Transaction tx,
StoredDestination sd, List<String> subscriptionKeys) throws IOException {
+
+ final Map<String, AtomicLong> subPendingMessageSizes = new HashMap<>();
+ final Map<String, SequenceSet> messageSequencesMap = new HashMap<>();
+
+ if (sd.ackPositions != null) {
+ Long recoveryPosition = null;
+ //Go through each subscription and find matching ackPositions and
their first
+ //position to find the initial recovery position which is the
first message across all subs
+ //that needs to still be acked
+ for (String subscriptionKey : subscriptionKeys) {
+ subPendingMessageSizes.put(subscriptionKey, new AtomicLong());
+ final SequenceSet messageSequences = sd.ackPositions.get(tx,
subscriptionKey);
+ if (messageSequences != null && !messageSequences.isEmpty()) {
+ final long head = messageSequences.getHead().getFirst();
+ recoveryPosition = recoveryPosition != null ?
Math.min(recoveryPosition, head) : head;
+ //cache the SequenceSet to speed up recovery of metrics
below and avoid a second index hit
+ messageSequencesMap.put(subscriptionKey, messageSequences);
+ }
+ }
+ recoveryPosition = recoveryPosition != null ? recoveryPosition : 0;
+
+ final Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx,
+ new MessageOrderCursor(recoveryPosition));
+
+ //iterate through all messages starting at the recovery position
to recover metrics
+ while (iterator.hasNext()) {
+ final Entry<Long, MessageKeys> messageEntry = iterator.next();
+
+ //For each message in the index check if each subscription
needs to ack the message still
+ //if the ackPositions SequenceSet contains the message then it
has not been acked and should be
+ //added to the pending metrics for that subscription
+ for (Entry<String, SequenceSet> seqEntry :
messageSequencesMap.entrySet()) {
+ final String subscriptionKey = seqEntry.getKey();
+ final SequenceSet messageSequences =
messageSequencesMap.get(subscriptionKey);
+ if (messageSequences.contains(messageEntry.getKey())) {
+
subPendingMessageSizes.get(subscriptionKey).addAndGet(messageEntry.getValue().location.getSize());
+ }
+ }
+ }
+ }
+
+ return subPendingMessageSizes;
+ }
+
protected long getStoredMessageSize(Transaction tx, StoredDestination sd,
String subscriptionKey) throws IOException {
long locationSize = 0;
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
index 519648e..cf22b27 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -55,9 +55,9 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class KahaDBDurableMessageRecoveryTest {
- @Parameters(name = "{0}")
+ @Parameters(name = "recoverIndex={0},enableSubscriptionStats={1}")
public static Collection<Object[]> data() {
- return Arrays.asList(new Object[][] { { false }, { true } });
+ return Arrays.asList(new Object[][] { { false, false }, { false, true
}, { true, false }, { true, true } });
}
@Rule
@@ -66,6 +66,7 @@ public class KahaDBDurableMessageRecoveryTest {
private URI brokerConnectURI;
private boolean recoverIndex;
+ private boolean enableSubscriptionStats;
@Before
public void setUpBroker() throws Exception {
@@ -81,9 +82,10 @@ public class KahaDBDurableMessageRecoveryTest {
/**
* @param deleteIndex
*/
- public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) {
+ public KahaDBDurableMessageRecoveryTest(boolean recoverIndex, boolean
enableSubscriptionStats) {
super();
this.recoverIndex = recoverIndex;
+ this.enableSubscriptionStats = enableSubscriptionStats;
}
protected void startBroker(boolean recoverIndex) throws Exception {
@@ -105,6 +107,7 @@ public class KahaDBDurableMessageRecoveryTest {
KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter)
brokerService.getPersistenceAdapter();
adapter.setForceRecoverIndex(forceRecoverIndex);
+ adapter.setEnableSubscriptionStatistics(enableSubscriptionStats);
// set smaller size for test
adapter.setJournalMaxFileLength(1024 * 20);
@@ -210,10 +213,12 @@ public class KahaDBDurableMessageRecoveryTest {
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic,
"clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic,
"clientId1", "sub2"), 3000, 500));
- //Verify the pending size is less for sub1
- assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
- assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
- assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") <
getPendingMessageSize(topic, "clientId1", "sub2"));
+ // Verify the pending size is less for sub1
+ final long sub1PendingSizeBeforeRestart = getPendingMessageSize(topic,
"clientId1", "sub1");
+ final long sub2PendingSizeBeforeRestart = getPendingMessageSize(topic,
"clientId1", "sub2");
+ assertTrue(sub1PendingSizeBeforeRestart > 0);
+ assertTrue(sub2PendingSizeBeforeRestart > 0);
+ assertTrue(sub1PendingSizeBeforeRestart <
sub2PendingSizeBeforeRestart);
subscriber1.close();
subscriber2.close();
@@ -223,10 +228,9 @@ public class KahaDBDurableMessageRecoveryTest {
assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic,
"clientId1", "sub1"), 3000, 500));
assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic,
"clientId1", "sub2"), 3000, 500));
- //Verify the pending size is less for sub1
- assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
- assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
- assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") <
getPendingMessageSize(topic, "clientId1", "sub2"));
+ // Verify the pending size is less for sub1
+ assertEquals(sub1PendingSizeBeforeRestart,
getPendingMessageSize(topic, "clientId1", "sub1"));
+ assertEquals(sub2PendingSizeBeforeRestart,
getPendingMessageSize(topic, "clientId1", "sub2"));
// Recreate subscriber and try and receive the other 8 messages
session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
@@ -293,7 +297,7 @@ public class KahaDBDurableMessageRecoveryTest {
subscriber2.close();
restartBroker(recoverIndex);
- //Manually recover subscription and verify proper messages are loaded
+ // Manually recover subscription and verify proper messages are loaded
final Topic brokerTopic = (Topic) broker.getDestination(topic);
final TopicMessageStore store = (TopicMessageStore)
brokerTopic.getMessageStore();
final AtomicInteger sub1Recovered = new AtomicInteger();
@@ -348,7 +352,7 @@ public class KahaDBDurableMessageRecoveryTest {
}
});
- //Verify proper number of messages are recovered
+ // Verify proper number of messages are recovered
assertEquals(8, sub1Recovered.get());
assertEquals(10, sub2Recovered.get());
}