Repository: activemq Updated Branches: refs/heads/activemq-5.15.x a2dccbf84 -> 9845853b6
[AMQ-7126] Improvement to perf of 5266Test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9845853b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9845853b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9845853b Branch: refs/heads/activemq-5.15.x Commit: 9845853b6860619588e50b7a56a2f80af024d28e Parents: a2dccbf Author: jgoodyear <jgoody...@apache.org> Authored: Mon Jan 7 21:21:05 2019 -0330 Committer: jgoodyear <jgoody...@apache.org> Committed: Mon Jan 7 21:21:05 2019 -0330 ---------------------------------------------------------------------- .../region/cursors/AbstractPendingMessageCursor.java | 10 ++++++++++ .../broker/region/cursors/AbstractStoreCursor.java | 6 ++++++ .../apache/activemq/store/MessageRecoveryListener.java | 3 +++ .../org/apache/activemq/store/kahadb/KahaDBStore.java | 2 +- .../scala/org/apache/activemq/leveldb/DBManager.scala | 2 +- .../org/apache/activemq/usecases/MemoryLimitTest.java | 2 +- 6 files changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 2c78ae3..dad4a59 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -177,6 +177,16 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; } + boolean parentHasSpace(int waterMark) { + boolean result = true; + if (systemUsage != null) { + if (systemUsage.getMemoryUsage().getParent() != null) { + return systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark; + } + } + return result; + } + private boolean isParentFull() { boolean result = false; if (systemUsage != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java index aef7528..85d6ee2 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java @@ -276,6 +276,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i return useCache && size==0 && hasSpace() && isStarted(); } + @Override + public boolean canRecoveryNextMessage() { + // Should be safe to recovery messages if the overall memory usage if < 90% + return parentHasSpace(90); + } + private void syncWithStore(Message currentAdd) throws Exception { pruneLastCached(); for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java index 5cbeac9..8d9bf62 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java @@ -26,6 +26,9 @@ public interface MessageRecoveryListener { boolean recoverMessage(Message message) throws Exception; boolean recoverMessageReference(MessageId ref) throws Exception; boolean hasSpace(); + default boolean canRecoveryNextMessage() { + return true; + } /** * check if ref is a duplicate but do not record the reference * @param ref http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index e781d84..c552d79 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -595,7 +595,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); listener.recoverMessage(msg); counter++; - if (counter >= maxReturned || !listener.hasSpace()) { + if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { break; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index f9ce9e7..8119a9b 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) { lastmsgid = msg.getMessageId count += 1 } - count < max && listener.hasSpace + count < max && listener.canRecoveryNextMessage } if( lastmsgid==null ) { startPos http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java index fa27175..4c0747f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java @@ -136,7 +136,7 @@ public class MemoryLimitTest extends TestSupport { assertTrue("Should be less than 70% of limit but was: " + percentUsage, percentUsage <= 71); LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage()); - assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 71); + assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() <= 91); // let's make sure we can consume all messages for (int i = 1; i < 2000; i++) {