Repository: activemq
Updated Branches:
  refs/heads/activemq-5.15.x a2dccbf84 -> 9845853b6


[AMQ-7126] Improvement to perf of 5266Test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9845853b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9845853b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9845853b

Branch: refs/heads/activemq-5.15.x
Commit: 9845853b6860619588e50b7a56a2f80af024d28e
Parents: a2dccbf
Author: jgoodyear <jgoody...@apache.org>
Authored: Mon Jan 7 21:21:05 2019 -0330
Committer: jgoodyear <jgoody...@apache.org>
Committed: Mon Jan 7 21:21:05 2019 -0330

----------------------------------------------------------------------
 .../region/cursors/AbstractPendingMessageCursor.java      | 10 ++++++++++
 .../broker/region/cursors/AbstractStoreCursor.java        |  6 ++++++
 .../apache/activemq/store/MessageRecoveryListener.java    |  3 +++
 .../org/apache/activemq/store/kahadb/KahaDBStore.java     |  2 +-
 .../scala/org/apache/activemq/leveldb/DBManager.scala     |  2 +-
 .../org/apache/activemq/usecases/MemoryLimitTest.java     |  2 +-
 6 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
index 2c78ae3..dad4a59 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
@@ -177,6 +177,16 @@ public abstract class AbstractPendingMessageCursor 
implements PendingMessageCurs
         return systemUsage != null ? (!isParentFull() && 
systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : 
true;
     }
 
+    boolean parentHasSpace(int waterMark) {
+        boolean result = true;
+        if (systemUsage != null) {
+            if (systemUsage.getMemoryUsage().getParent() != null) {
+                return 
systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark;
+            }
+        }
+        return result;
+    }
+
     private boolean isParentFull() {
         boolean result = false;
         if (systemUsage != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index aef7528..85d6ee2 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -276,6 +276,12 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
         return useCache && size==0 && hasSpace() && isStarted();
     }
 
+    @Override
+    public boolean canRecoveryNextMessage() {
+        // Should be safe to recovery messages if the overall memory usage if 
< 90%
+        return parentHasSpace(90);
+    }
+
     private void syncWithStore(Message currentAdd) throws Exception {
         pruneLastCached();
         for (ListIterator<MessageId> it = 
pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
index 5cbeac9..8d9bf62 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
@@ -26,6 +26,9 @@ public interface MessageRecoveryListener {
     boolean recoverMessage(Message message) throws Exception;
     boolean recoverMessageReference(MessageId ref) throws Exception;
     boolean hasSpace();
+    default boolean canRecoveryNextMessage() {
+        return true;
+    }
     /**
      * check if ref is a duplicate but do not record the reference
      * @param ref

http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index e781d84..c552d79 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -595,7 +595,7 @@ public class KahaDBStore extends MessageDatabase implements 
PersistenceAdapter,
                             
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
                             listener.recoverMessage(msg);
                             counter++;
-                            if (counter >= maxReturned || 
!listener.hasSpace()) {
+                            if (counter >= maxReturned || 
!listener.canRecoveryNextMessage()) {
                                 break;
                             }
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index f9ce9e7..8119a9b 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
         lastmsgid = msg.getMessageId
         count += 1
       }
-      count < max && listener.hasSpace
+      count < max && listener.canRecoveryNextMessage
     }
     if( lastmsgid==null ) {
       startPos

http://git-wip-us.apache.org/repos/asf/activemq/blob/9845853b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index fa27175..4c0747f 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -136,7 +136,7 @@ public class MemoryLimitTest extends TestSupport {
         assertTrue("Should be less than 70% of limit but was: " + 
percentUsage, percentUsage <= 71);
 
         LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
-        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() 
<= 71);
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() 
<= 91);
 
         // let's make sure we can consume all messages
         for (int i = 1; i < 2000; i++) {

Reply via email to