This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new 4b51f8b AMQ-7132 - ActiveMQ reads lots of index pages upon startup
(after a graceful or ungraceful shutdown)
new 03ce997 Merge branch 'AMQ-7132'
4b51f8b is described below
commit 4b51f8b66caaa2443795c548f5f7e26cb3740969
Author: Alan Protasio <[email protected]>
AuthorDate: Thu Jan 10 21:20:57 2019 -0800
AMQ-7132 - ActiveMQ reads lots of index pages upon startup (after a
graceful or ungraceful shutdown)
---
.../activemq/management/SizeStatisticImpl.java | 18 ++
.../apache/activemq/store/kahadb/KahaDBStore.java | 21 +-
.../activemq/store/kahadb/MessageDatabase.java | 180 +++++++++++++--
.../activemq/broker/RecoveryStatsBrokerTest.java | 242 +++++++++++++++++++++
.../activemq/store/kahadb/KahaDBVersionTest.java | 5 +
.../store/kahadb/MKahaDBStoreLimitTest.java | 2 +-
.../org/apache/activemq/usage/StoreUsageTest.java | 2 +-
...flineSelectorConcurrentConsumeIndexUseTest.java | 2 +-
.../activemq/store/kahadb/KahaDBVersion6/db-1.log | Bin 0 -> 1048576 bytes
.../activemq/store/kahadb/KahaDBVersion6/db.data | Bin 0 -> 675840 bytes
.../activemq/store/kahadb/KahaDBVersion6/db.redo | Bin 0 -> 668944 bytes
.../org/apache/activemq/web/LocalBrokerFacade.java | 1 +
12 files changed, 438 insertions(+), 35 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
index e2bc033..9be0d11 100644
---
a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
+++
b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java
@@ -92,6 +92,13 @@ public class SizeStatisticImpl extends StatisticImpl {
}
/**
+ * @return the maximum size of any step
+ */
+ public synchronized void setMaxSize(long size) {
+ maxSize = size;
+ }
+
+ /**
* @return the minimum size of any step
*/
public synchronized long getMinSize() {
@@ -99,12 +106,23 @@ public class SizeStatisticImpl extends StatisticImpl {
}
/**
+ * @return the maximum size of any step
+ */
+ public synchronized void setMinSize(long size) {
+ minSize = size;
+ }
+
+ /**
* @return the total size of all the steps added together
*/
public synchronized long getTotalSize() {
return totalSize;
}
+ public synchronized void setCount(long count) {
+ this.count = count;
+ }
+
/**
* @return the average size calculated by dividing the total size by the
* number of counts
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index b5c466d..37f3b90 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -459,7 +459,9 @@ public class KahaDBStore extends MessageDatabase implements
PersistenceAdapter,
ackedAndPrepared.remove(id);
if (rollback) {
rolledBackAcks.add(id);
- incrementAndAddSizeToStoreStat(dest, 0);
+ pageFile.tx().execute(tx -> {
+ incrementAndAddSizeToStoreStat(tx, dest, 0);
+ });
}
}
} finally {
@@ -812,16 +814,19 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
recoveredStatistics = pageFile.tx().execute(new
Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
@Override
public MessageStoreStatistics execute(Transaction tx)
throws IOException {
- MessageStoreStatistics statistics = new
MessageStoreStatistics();
+ MessageStoreStatistics statistics =
getStoredMessageStoreStatistics(dest, tx);
// Iterate through all index entries to get the
size of each message
- StoredDestination sd = getStoredDestination(dest,
tx);
- for (Iterator<Entry<Location, Long>> iterator =
sd.locationIndex.iterator(tx); iterator.hasNext();) {
- int locationSize =
iterator.next().getKey().getSize();
- statistics.getMessageCount().increment();
-
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
+ if (statistics == null) {
+ StoredDestination sd =
getStoredDestination(dest, tx);
+ statistics = new MessageStoreStatistics();
+ for (Iterator<Entry<Location, Long>> iterator
= sd.locationIndex.iterator(tx); iterator.hasNext(); ) {
+ int locationSize =
iterator.next().getKey().getSize();
+ statistics.getMessageCount().increment();
+
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
+ }
}
- return statistics;
+ return statistics;
}
});
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 89f694b..78d2bfa 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -134,7 +134,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
- static final int VERSION = 6;
+ static final int VERSION = 7;
static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
@@ -188,6 +188,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
} catch (EOFException expectedOnUpgrade) {
openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
}
+
LOG.info("KahaDB is version " + version);
}
@@ -863,7 +864,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
metadata.producerSequenceIdTracker.rollback(keys.messageId);
undoCounter++;
- decrementAndSubSizeToStoreStat(key,
keys.location.getSize());
+ decrementAndSubSizeToStoreStat(tx, key, sd,
keys.location.getSize());
// TODO: do we need to modify the ack positions for the
pub sub case?
}
}
@@ -979,7 +980,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
LOG.info("[" + sdEntry.getKey() + "] dropped: " +
keys.messageId + " at corrupt location: " + keys.location);
undoCounter++;
- decrementAndSubSizeToStoreStat(sdEntry.getKey(),
keys.location.getSize());
+ decrementAndSubSizeToStoreStat(tx,
sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize());
// TODO: do we need to modify the ack positions
for the pub sub case?
}
} else {
@@ -1491,7 +1492,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
if (previous == null) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if (previous == null) {
- incrementAndAddSizeToStoreStat(command.getDestination(),
location.getSize());
+ incrementAndAddSizeToStoreStat(tx, command.getDestination(),
location.getSize());
sd.orderIndex.put(tx, priority, id, new
MessageKeys(command.getMessageId(), location));
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx))
{
addAckLocationForNewMessage(tx, command.getDestination(),
sd, id);
@@ -1550,11 +1551,11 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
new MessageKeys(command.getMessageId(), location)
);
sd.locationIndex.put(tx, location, id);
- incrementAndAddSizeToStoreStat(command.getDestination(),
location.getSize());
+ incrementAndAddSizeToStoreStat(tx, command.getDestination(),
location.getSize());
if (previousKeys != null) {
//Remove the existing from the size
- decrementAndSubSizeToStoreStat(command.getDestination(),
previousKeys.location.getSize());
+ decrementAndSubSizeToStoreStat(tx, command.getDestination(),
previousKeys.location.getSize());
//update all the subscription metrics
if (enableSubscriptionStatistics && sd.ackPositions != null &&
location.getSize() != previousKeys.location.getSize()) {
@@ -1590,7 +1591,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
- decrementAndSubSizeToStoreStat(command.getDestination(),
keys.location.getSize());
+ decrementAndSubSizeToStoreStat(tx,
command.getDestination(), keys.location.getSize());
recordAckMessageReferenceLocation(ackLocation,
keys.location);
metadata.lastUpdate = ackLocation;
} else if (LOG.isDebugEnabled()) {
@@ -1655,6 +1656,9 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
sd.messageIdIndex.unload(tx);
tx.free(sd.messageIdIndex.getPageId());
+ tx.free(sd.messageStoreStatistics.getPageId());
+ sd.messageStoreStatistics = null;
+
if (sd.subscriptions != null) {
sd.subscriptions.clear(tx);
sd.subscriptions.unload(tx);
@@ -2362,6 +2366,53 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
}
}
+ class StoredMessageStoreStatistics {
+ private PageFile pageFile;
+ private Page<MessageStoreStatistics> page;
+ private long pageId;
+ private AtomicBoolean loaded = new AtomicBoolean();
+ private MessageStoreStatisticsMarshaller
messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller();
+
+ StoredMessageStoreStatistics(PageFile pageFile, long pageId) {
+ this.pageId = pageId;
+ this.pageFile = pageFile;
+ }
+
+ StoredMessageStoreStatistics(PageFile pageFile, Page page) {
+ this(pageFile, page.getPageId());
+ }
+
+ public long getPageId() {
+ return pageId;
+ }
+
+ synchronized void load(Transaction tx) throws IOException {
+ if (loaded.compareAndSet(false, true)) {
+ page = tx.load(pageId, null);
+
+ if (page.getType() == Page.PAGE_FREE_TYPE) {
+ page.set(null);
+ tx.store(page, messageStoreStatisticsMarshaller, true);
+ }
+ }
+ page = tx.load(pageId, messageStoreStatisticsMarshaller);
+ }
+
+ synchronized MessageStoreStatistics get(Transaction tx) throws
IOException {
+ load(tx);
+ return page.get();
+ }
+
+ synchronized void put(Transaction tx, MessageStoreStatistics
storeStatistics) throws IOException {
+ if (page == null) {
+ page = tx.load(pageId, messageStoreStatisticsMarshaller);
+ }
+
+ page.set(storeStatistics);
+
+ tx.store(page, messageStoreStatisticsMarshaller, true);
+ }
+ }
class StoredDestination {
MessageOrderIndex orderIndex = new MessageOrderIndex();
@@ -2378,6 +2429,8 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
// Transient data used to track which Messages are no longer needed.
final HashSet<String> subscriptionCache = new LinkedHashSet<>();
+ StoredMessageStoreStatistics messageStoreStatistics;
+
public void trackPendingAdd(Long seq) {
orderIndex.trackPendingAdd(seq);
}
@@ -2392,6 +2445,38 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
}
}
+ protected class MessageStoreStatisticsMarshaller extends
VariableMarshaller<MessageStoreStatistics> {
+
+ @Override
+ public void writePayload(final MessageStoreStatistics object, final
DataOutput dataOut) throws IOException {
+ dataOut.writeBoolean(null != object);
+ if (object != null) {
+ dataOut.writeLong(object.getMessageCount().getCount());
+ dataOut.writeLong(object.getMessageSize().getTotalSize());
+ dataOut.writeLong(object.getMessageSize().getMaxSize());
+ dataOut.writeLong(object.getMessageSize().getMinSize());
+ dataOut.writeLong(object.getMessageSize().getCount());
+ }
+ }
+
+ @Override
+ public MessageStoreStatistics readPayload(final DataInput dataIn)
throws IOException {
+
+ if (!dataIn.readBoolean()) {
+ return null;
+ }
+
+ MessageStoreStatistics messageStoreStatistics = new
MessageStoreStatistics();
+
messageStoreStatistics.getMessageCount().setCount(dataIn.readLong());
+
messageStoreStatistics.getMessageSize().setTotalSize(dataIn.readLong());
+
messageStoreStatistics.getMessageSize().setMaxSize(dataIn.readLong());
+
messageStoreStatistics.getMessageSize().setMinSize(dataIn.readLong());
+
messageStoreStatistics.getMessageSize().setCount(dataIn.readLong());
+
+ return messageStoreStatistics;
+ }
+ }
+
protected class StoredDestinationMarshaller extends
VariableMarshaller<StoredDestination> {
final MessageKeysMarshaller messageKeysMarshaller = new
MessageKeysMarshaller();
@@ -2470,6 +2555,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
});
}
}
+
if (metadata.version >= 2) {
value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile,
dataIn.readLong());
value.orderIndex.highPriorityIndex = new
BTreeIndex<>(pageFile, dataIn.readLong());
@@ -2491,6 +2577,15 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
});
}
+ if (metadata.version >= 7) {
+ value.messageStoreStatistics = new
StoredMessageStoreStatistics(pageFile, dataIn.readLong());
+ } else {
+ pageFile.tx().execute(tx -> {
+ value.messageStoreStatistics = new
StoredMessageStoreStatistics(pageFile, tx.allocate());
+ value.messageStoreStatistics.load(tx);
+ });
+ }
+
return value;
}
@@ -2510,6 +2605,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
}
dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
+ dataOut.writeLong(value.messageStoreStatistics.getPageId());
}
}
@@ -2543,6 +2639,11 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
return rc;
}
+ protected MessageStoreStatistics
getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx)
throws IOException {
+ StoredDestination sd = getStoredDestination(destination, tx);
+ return sd != null && sd.messageStoreStatistics != null ?
sd.messageStoreStatistics.get(tx) : null;
+ }
+
protected StoredDestination getExistingStoredDestination(KahaDestination
destination, Transaction tx) throws IOException {
String key = key(destination);
StoredDestination rc = storedDestinations.get(key);
@@ -2575,9 +2676,14 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
}
+
+ rc.messageStoreStatistics = new
StoredMessageStoreStatistics(pageFile, tx.allocate());
+
metadata.destinations.put(tx, key, rc);
}
+ rc.messageStoreStatistics.load(tx);
+
// Configure the marshalers and load.
rc.orderIndex.load(tx);
@@ -2644,9 +2750,6 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
}
}
- // Configure the message references index
-
-
// Configure the subscription cache
for (Iterator<Entry<String, LastAck>> iterator =
rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
Entry<String, LastAck> entry = iterator.next();
@@ -2707,31 +2810,60 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
* @param kahaDestination
* @param size
*/
- protected void incrementAndAddSizeToStoreStat(KahaDestination
kahaDestination, long size) {
- incrementAndAddSizeToStoreStat(key(kahaDestination), size);
+ protected void incrementAndAddSizeToStoreStat(Transaction tx,
KahaDestination kahaDestination, long size) throws IOException {
+ StoredDestination sd = getStoredDestination(kahaDestination, tx);
+ incrementAndAddSizeToStoreStat(tx, key(kahaDestination), sd, size);
}
- protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long
size) {
+ protected void incrementAndAddSizeToStoreStat(Transaction tx, String
kahaDestKey, StoredDestination sd, long size) throws IOException {
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
if (storeStats != null) {
- storeStats.getMessageCount().increment();
- if (size > 0) {
- storeStats.getMessageSize().addSize(size);
+ incrementAndAddSizeToStoreStat(size, storeStats);
+ sd.messageStoreStatistics.put(tx, storeStats);
+ } else if (sd != null){
+ // During the recovery the storeStats is null
+ MessageStoreStatistics storedStoreStats =
sd.messageStoreStatistics.get(tx);
+ if (storedStoreStats == null) {
+ storedStoreStats = new MessageStoreStatistics();
}
+ incrementAndAddSizeToStoreStat(size, storedStoreStats);
+ sd.messageStoreStatistics.put(tx, storedStoreStats);
}
}
- protected void decrementAndSubSizeToStoreStat(KahaDestination
kahaDestination, long size) {
- decrementAndSubSizeToStoreStat(key(kahaDestination), size);
+ private void incrementAndAddSizeToStoreStat(final long size, final
MessageStoreStatistics storedStoreStats) {
+ storedStoreStats.getMessageCount().increment();
+ if (size > 0) {
+ storedStoreStats.getMessageSize().addSize(size);
+ }
}
- protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long
size) {
+ protected void decrementAndSubSizeToStoreStat(Transaction tx,
KahaDestination kahaDestination, long size) throws IOException {
+ StoredDestination sd = getStoredDestination(kahaDestination, tx);
+ decrementAndSubSizeToStoreStat(tx, key(kahaDestination), sd,size);
+ }
+
+ protected void decrementAndSubSizeToStoreStat(Transaction tx, String
kahaDestKey, StoredDestination sd, long size) throws IOException {
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
if (storeStats != null) {
- storeStats.getMessageCount().decrement();
- if (size > 0) {
- storeStats.getMessageSize().addSize(-size);
+ decrementAndSubSizeToStoreStat(size, storeStats);
+ sd.messageStoreStatistics.put(tx, storeStats);
+ } else if (sd != null){
+ // During the recovery the storeStats is null
+ MessageStoreStatistics storedStoreStats =
sd.messageStoreStatistics.get(tx);
+ if (storedStoreStats == null) {
+ storedStoreStats = new MessageStoreStatistics();
}
+ decrementAndSubSizeToStoreStat(size, storedStoreStats);
+ sd.messageStoreStatistics.put(tx, storedStoreStats);
+ }
+ }
+
+ private void decrementAndSubSizeToStoreStat(final long size, final
MessageStoreStatistics storedStoreStats) {
+ storedStoreStats.getMessageCount().decrement();
+
+ if (size > 0) {
+ storedStoreStats.getMessageSize().addSize(-size);
}
}
@@ -2936,7 +3068,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
- decrementAndSubSizeToStoreStat(command.getDestination(),
entry.getValue().location.getSize());
+ decrementAndSubSizeToStoreStat(tx,
command.getDestination(), entry.getValue().location.getSize());
}
}
}
@@ -2990,7 +3122,7 @@ public abstract class MessageDatabase extends
ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
- decrementAndSubSizeToStoreStat(command.getDestination(),
entry.getValue().location.getSize());
+ decrementAndSubSizeToStoreStat(tx,
command.getDestination(), entry.getValue().location.getSize());
}
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java
new file mode 100644
index 0000000..3692574
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.store.MessageStoreStatistics;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+@RunWith(value = Parameterized.class)
+public class RecoveryStatsBrokerTest extends BrokerRestartTestSupport {
+
+ private RestartType restartType;
+
+ enum RestartType {
+ NORMAL,
+ FULL_RECOVERY,
+ UNCLEAN_SHUTDOWN
+ }
+
+ @Override
+ protected void configureBroker(BrokerService broker) throws Exception {
+ KahaDBPersistenceAdapter persistenceAdapter = new
KahaDBPersistenceAdapter();
+ persistenceAdapter.setJournalMaxFileLength(1024*1024);
+ //persistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
+ persistenceAdapter.setDirectory(broker.getBrokerDataDirectory());
+ broker.setPersistenceAdapter(persistenceAdapter);
+ broker.setDestinationPolicy(policyMap);
+ }
+
+ protected void restartBroker(RestartType restartType) throws Exception {
+ if (restartType == RestartType.FULL_RECOVERY) {
+ stopBroker();
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter =
(KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+ File dir = kahaDBPersistenceAdapter.getDirectory();
+ if (dir != null) {
+ IOHelper.deleteFile(new File(dir, "db.data"));
+ }
+ broker.start();
+ } else if (restartType == RestartType.UNCLEAN_SHUTDOWN){
+ //Simulate an unclean shutdown
+
+ File dir = broker.getBrokerDataDirectory();
+ File backUpDir = new File(dir, "bk");
+ IOHelper.mkdirs(new File(dir, "bk"));
+
+ for (File f: dir.listFiles()) {
+ if (!f.isDirectory()) {
+ IOHelper.copyFile(f, new File(backUpDir, f.getName()));
+ }
+ }
+
+ stopBroker();
+
+ for (File f: backUpDir.listFiles()) {
+ if (!f.isDirectory()) {
+ IOHelper.copyFile(f, new File(dir, f.getName()));
+ }
+ }
+
+ broker.start();
+ } else {
+ restartBroker();
+ }
+ }
+
+ @Parameterized.Parameters(name="{0}")
+ public static Collection<Object[]> getTestParameters() {
+ return Arrays.asList(new Object[][] {
+ {RestartType.NORMAL},
+ {RestartType.FULL_RECOVERY},
+ {RestartType.UNCLEAN_SHUTDOWN},
+ });
+ }
+
+ public RecoveryStatsBrokerTest(RestartType restartType) {
+ this.restartType = restartType;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testStaticsRecovery() throws Exception {
+ List<ActiveMQDestination> destinations = ImmutableList.of(new
ActiveMQQueue("TEST.A"), new ActiveMQQueue("TEST.B"));
+ Random random = new Random();
+ Map<ActiveMQDestination, Integer> consumedMessages = new HashMap<>();
+
+ destinations.forEach(destination -> consumedMessages.put(destination,
0));
+
+ int numberOfMessages = 10000;
+
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ for (ActiveMQDestination destination : destinations) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ message.setProducerId(message.getMessageId().getProducerId());
+ connection.request(message);
+ }
+ }
+
+ Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics =
getCurrentStatistics(destinations);
+
+ checkStatistics(destinations, originalStatistics);
+
+ restartBroker(restartType);
+
+ checkStatistics(destinations, originalStatistics);
+
+ for (ActiveMQDestination destination : destinations) {
+ consume(destination, 100, false);
+ }
+
+ checkStatistics(destinations, originalStatistics);
+
+ restartBroker(restartType);
+
+ checkStatistics(destinations, originalStatistics);
+
+ for (ActiveMQDestination destination : destinations) {
+ int messagesToConsume = random.nextInt(numberOfMessages);
+ consume(destination, messagesToConsume, true);
+ consumedMessages.compute(destination, (key, value) -> value =
value + messagesToConsume);
+ }
+
+ originalStatistics = getCurrentStatistics(destinations);
+
+ for (ActiveMQDestination destination : destinations) {
+ int consumedCount = consumedMessages.get(destination);
+ assertEquals("", numberOfMessages - consumedCount,
originalStatistics.get(destination).getMessageCount().getCount());
+ }
+
+ checkStatistics(destinations, originalStatistics);
+
+ restartBroker(restartType);
+
+ checkStatistics(destinations, originalStatistics);
+ }
+
+ private Map<ActiveMQDestination, MessageStoreStatistics>
getCurrentStatistics(final List<ActiveMQDestination> destinations) {
+ return destinations.stream()
+ .map(destination -> getDestination(broker, destination))
+ .collect(Collectors.toMap(destination -> new
ActiveMQQueue(destination.getName()), destination2 ->
destination2.getMessageStore().getMessageStoreStatistics()));
+ }
+
+ private void consume(ActiveMQDestination destination, int
numberOfMessages, boolean shouldAck) throws Exception {
+ // Setup the consumer and receive the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ connection.send(consumerInfo);
+
+ // The we should get the messages.
+ for (int i = 0; i < numberOfMessages; i++) {
+ Message m2 = receiveMessage(connection);
+ assertNotNull(m2);
+ if (shouldAck) {
+ MessageAck ack = createAck(consumerInfo, m2, 1,
MessageAck.STANDARD_ACK_TYPE);
+ connection.request(ack);
+ }
+ }
+
+ connection.request(closeConnectionInfo(connectionInfo));
+ }
+
+ private void checkStatistics(final List<ActiveMQDestination> destinations,
final Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics) {
+ for (ActiveMQDestination destination : destinations) {
+ MessageStoreStatistics original =
originalStatistics.get(destination);
+ MessageStoreStatistics actual = getDestination(broker,
destination).getMessageStore().getMessageStoreStatistics();
+ assertEquals("Have Same Count",
original.getMessageCount().getCount(), actual.getMessageCount().getCount());
+ assertEquals("Have Same TotalSize",
original.getMessageSize().getTotalSize(), getDestination(broker,
destination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
+ }
+ }
+
+ protected Destination getDestination(BrokerService target,
ActiveMQDestination destination) {
+ RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
+ if (destination.isTemporary()) {
+ return destination.isQueue() ?
regionBroker.getTempQueueRegion().getDestinationMap().get(destination) :
+
regionBroker.getTempTopicRegion().getDestinationMap().get(destination);
+ }
+ return destination.isQueue() ?
+
regionBroker.getQueueRegion().getDestinationMap().get(destination) :
+
regionBroker.getTopicRegion().getDestinationMap().get(destination);
+ }
+}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
index 0b643b9..85e785d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
@@ -57,6 +57,7 @@ public class KahaDBVersionTest extends TestCase {
final static File VERSION_3_DB = new File(basedir +
"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
final static File VERSION_4_DB = new File(basedir +
"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
final static File VERSION_5_DB = new File(basedir +
"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion5");
+ final static File VERSION_6_DB = new File(basedir +
"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6");
BrokerService broker = null;
@@ -133,6 +134,10 @@ public class KahaDBVersionTest extends TestCase {
doConvertRestartCycle(VERSION_5_DB);
}
+ public void testVersion6Conversion() throws Exception {
+ doConvertRestartCycle(VERSION_6_DB);
+ }
+
public void doConvertRestartCycle(File existingStore) throws Exception {
File testDir = new File("target/activemq-data/kahadb/versionDB");
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
index 4a8eea9..9ebffca 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java
@@ -177,7 +177,7 @@ public class MKahaDBStoreLimitTest {
FilteredKahaDBPersistenceAdapter filtered = new
FilteredKahaDBPersistenceAdapter();
StoreUsage storeUsage = new StoreUsage();
- storeUsage.setLimit(40*1024);
+ storeUsage.setLimit(44*1024);
filtered.setUsage(storeUsage);
filtered.setDestination(queueA);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
index defba3a..0c048b2 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java
@@ -37,7 +37,7 @@ public class StoreUsageTest extends EmbeddedBrokerTestSupport
{
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
- broker.getSystemUsage().getStoreUsage().setLimit(34 * 1024);
+ broker.getSystemUsage().getStoreUsage().setLimit(38 * 1024);
broker.deleteAllMessages();
return broker;
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
index 0afc8da..8cf24e7 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
@@ -218,7 +218,7 @@ public class
DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
assertTrue("no leak of pages, always use just 11",
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- return 11 == store.getPageFile().getPageCount() -
+ return 12 == store.getPageFile().getPageCount() -
store.getPageFile().getFreePageCount();
}
}, TimeUnit.SECONDS.toMillis(10)));
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log
b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log
new file mode 100644
index 0000000..34facec
Binary files /dev/null and
b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log
differ
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data
b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data
new file mode 100644
index 0000000..6c71774
Binary files /dev/null and
b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data
differ
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo
b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo
new file mode 100644
index 0000000..5cb7b87
Binary files /dev/null and
b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo
differ
diff --git
a/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java
b/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java
index 50ef17c..28af6a2 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java
@@ -87,6 +87,7 @@ public class LocalBrokerFacade extends BrokerFacadeSupport {
}
}
+
private Destination unwrap(Destination dest) {
if (dest instanceof DestinationFilter) {
return unwrap(((DestinationFilter) dest).getNext());