Repository: activemq
Updated Branches:
  refs/heads/master 903dec615 -> 13ec99493


Revert "https://issues.apache.org/jira/browse/AMQ-4495 - revisit. Reinstate 
check for space on pagein, so that highWaterMark is respected and full state is 
not reached, hense pfc is not triggered in error"

This reverts commit d8cf54b0a9eee4b86db1ffef2cb3dd1171067307.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/13ec9949
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/13ec9949
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/13ec9949

Branch: refs/heads/master
Commit: 13ec9949397848c57653845b35e8003f8c490ebd
Parents: 903dec6
Author: gtully <[email protected]>
Authored: Mon Mar 7 16:26:25 2016 +0000
Committer: gtully <[email protected]>
Committed: Mon Mar 7 16:26:25 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |   4 +-
 .../region/cursors/AbstractStoreCursor.java     |   4 +-
 .../region/cursors/QueueStorePrefetch.java      |  11 +-
 .../broker/region/cursors/StoreQueueCursor.java |   2 +-
 .../region/cursors/TopicStorePrefetch.java      |  15 +-
 .../store/jdbc/JDBCMessageRecoveryListener.java |   1 -
 .../activemq/store/jdbc/JDBCMessageStore.java   |  37 ++--
 .../store/jdbc/JDBCTopicMessageStore.java       |   8 -
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  |  13 +-
 .../activemq/store/kahadb/KahaDBStore.java      |   2 +-
 .../org/apache/activemq/leveldb/DBManager.scala |   2 +-
 .../StoreQueueCursorNoDuplicateTest.java        |  14 +-
 .../cursors/StoreQueueCursorOrderTest.java      |  10 +-
 .../org/apache/activemq/bugs/AMQ4930Test.java   |   2 +-
 .../activemq/usecases/MemoryLimitPfcTest.java   | 213 -------------------
 .../activemq/usecases/MemoryLimitTest.java      |  13 +-
 .../activemq/usecases/QueueBrowsingTest.java    |   4 +-
 17 files changed, 75 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index eb1e812..3b1f85f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -636,8 +636,8 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG.info("Usage Manager Memory Limit ({}) reached (%{}) on 
{}, size {}. Producers will be throttled to the rate at which messages are 
removed from this destination to prevent flooding it. See 
http://activemq.apache.org/producer-flow-control.html for more info.",
-                                    memoryUsage.getLimit(), 
memoryUsage.getPercentUsage(), getActiveMQDestination().getQualifiedName(), 
destinationStatistics.getMessages().getCount());
+                    LOG.info("Usage Manager Memory Limit ({}) reached on {}, 
size {}. Producers will be throttled to the rate at which messages are removed 
from this destination to prevent flooding it. See 
http://activemq.apache.org/producer-flow-control.html for more info.",
+                                    memoryUsage.getLimit(), 
getActiveMQDestination().getQualifiedName(), 
destinationStatistics.getMessages().getCount());
                 }
 
                 if (!context.isNetworkConnection() && 
systemUsage.isSendFailIfNoSpace()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index d84379d..06bae97 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -48,6 +48,8 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     private static int SYNC_ADD = 0;
     private static int ASYNC_ADD = 1;
     final MessageId[] lastCachedIds = new MessageId[2];
+    protected boolean hadSpace = false;
+
 
 
     protected AbstractStoreCursor(Destination destination) {
@@ -399,7 +401,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
             resetBatch();
             this.batchResetNeeded = false;
         }
-        if (this.batchList.isEmpty() && this.size >0 && hasSpace()) {
+        if (this.batchList.isEmpty() && this.size >0) {
             try {
                 doFillBatch();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index dacae78..b10b2e2 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -38,14 +38,16 @@ import org.slf4j.LoggerFactory;
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = 
LoggerFactory.getLogger(QueueStorePrefetch.class);
     private final MessageStore store;
+    private final Broker broker;
 
     /**
      * Construct it
      * @param queue
      */
-    public QueueStorePrefetch(Queue queue) {
+    public QueueStorePrefetch(Queue queue, Broker broker) {
         super(queue);
         this.store = queue.getMessageStore();
+        this.broker = broker;
 
     }
 
@@ -113,8 +115,11 @@ class QueueStorePrefetch extends AbstractStoreCursor {
 
     @Override
     protected void doFillBatch() throws Exception {
-        this.store.recoverNextMessages(this.maxBatchSize, this);
-        dealWithDuplicates(); // without the index lock
+        hadSpace = this.hasSpace();
+        if (!broker.getBrokerService().isPersistent() || hadSpace) {
+            this.store.recoverNextMessages(this.maxBatchSize, this);
+            dealWithDuplicates(); // without the index lock
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index e6de82e..7f26b43 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -47,7 +47,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         super((queue != null ? queue.isPrioritizedMessages():false));
         this.broker=broker;
         this.queue = queue;
-        this.persistent = new QueueStorePrefetch(queue);
+        this.persistent = new QueueStorePrefetch(queue, broker);
         currentCursor = persistent;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 1a6a851..35ec3ed 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -40,6 +40,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
     private final String subscriberName;
     private final Subscription subscription;
     private byte lastRecoveredPriority = 9;
+    private boolean storeHasMessages = false;
 
     /**
      * @param topic
@@ -55,6 +56,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         this.maxProducersToAudit=32;
         this.maxAuditDepth=10000;
         resetSize();
+        this.storeHasMessages=this.size > 0;
     }
 
     @Override
@@ -71,6 +73,11 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         //this.messageSize.addSize(node.getMessage().getSize());
     }
 
+    @Override
+    public final synchronized boolean addMessageLast(MessageReference node) 
throws Exception {
+        this.storeHasMessages = super.addMessageLast(node);
+        return this.storeHasMessages;
+    }
 
     @Override
     public synchronized boolean recoverMessage(Message message, boolean 
cached) throws Exception {
@@ -83,6 +90,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             if (recovered && !cached) {
                 lastRecoveredPriority = message.getPriority();
             }
+            storeHasMessages = true;
         }
         return recovered;
     }
@@ -126,8 +134,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
 
     @Override
     protected void doFillBatch() throws Exception {
+        // avoid repeated  trips to the store if there is nothing of interest
+        this.storeHasMessages = false;
         this.store.recoverNextMessages(clientId, subscriberName,
                 maxBatchSize, this);
+        if (!this.storeHasMessages && (!this.batchList.isEmpty() || 
!hadSpace)) {
+            this.storeHasMessages = true;
+        }
     }
 
     public byte getLastRecoveredPriority() {
@@ -145,6 +158,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
 
     @Override
     public String toString() {
-        return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " 
+ this.subscription.getConsumerInfo().getConsumerId() + " - " + 
super.toString();
+        return "TopicStorePrefetch(" + clientId + "," + subscriberName + 
",storeHasMessages=" + this.storeHasMessages +") " + 
this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
index 5ade773..07f4816 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageRecoveryListener.java
@@ -24,5 +24,4 @@ package org.apache.activemq.store.jdbc;
 public interface JDBCMessageRecoveryListener {
     boolean recoverMessage(long sequenceId, byte[] message) throws Exception;
     boolean recoverMessageReference(String reference) throws Exception;
-    boolean hasSpace();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 27313f4..175002a 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -279,10 +279,6 @@ public class JDBCMessageStore extends AbstractMessageStore 
{
                 public boolean recoverMessageReference(String reference) 
throws Exception {
                     return listener.recoverMessageReference(new 
MessageId(reference));
                 }
-
-                public boolean hasSpace() {
-                    return listener.hasSpace();
-                }
             });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -341,25 +337,24 @@ public class JDBCMessageStore extends 
AbstractMessageStore {
             adapter.doRecoverNextMessages(c, destination, 
perPriorityLastRecovered, minPendingSequeunceId(),
                     maxReturned, isPrioritizedMessages(), new 
JDBCMessageRecoveryListener() {
 
-                        public boolean recoverMessage(long sequenceId, byte[] 
data) throws Exception {
-                            Message msg = (Message)wireFormat.unmarshal(new 
ByteSequence(data));
-                            msg.getMessageId().setBrokerSequenceId(sequenceId);
-                            
msg.getMessageId().setFutureOrSequenceLong(sequenceId);
-                            listener.recoverMessage(msg);
-                            trackLastRecovered(sequenceId, msg.getPriority());
-                            return true;
-                        }
-
-                        public boolean recoverMessageReference(String 
reference) throws Exception {
-                            listener.recoverMessageReference(new 
MessageId(reference));
-                            return true;
-                        }
+                public boolean recoverMessage(long sequenceId, byte[] data) 
throws Exception {
+                        Message msg = (Message)wireFormat.unmarshal(new 
ByteSequence(data));
+                        msg.getMessageId().setBrokerSequenceId(sequenceId);
+                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
+                        listener.recoverMessage(msg);
+                        trackLastRecovered(sequenceId, msg.getPriority());
+                        return true;
+                }
 
-                        public boolean hasSpace() {
-                            return listener.hasSpace();
-                        }
+                public boolean recoverMessageReference(String reference) 
throws Exception {
+                    if (listener.hasSpace()) {
+                        listener.recoverMessageReference(new 
MessageId(reference));
+                        return true;
+                    }
+                    return false;
+                }
 
-                    });
+            });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
         } finally {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index 7203f92..3bff9b2 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -129,10 +129,6 @@ public class JDBCTopicMessageStore extends 
JDBCMessageStore implements TopicMess
                     return listener.recoverMessageReference(new 
MessageId(reference));
                 }
 
-                public boolean hasSpace() {
-                    return listener.hasSpace();
-                }
-
             });
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
@@ -242,10 +238,6 @@ public class JDBCTopicMessageStore extends 
JDBCMessageStore implements TopicMess
             return false;
         }
 
-        public boolean hasSpace() {
-            return delegate.hasSpace();
-        }
-
         @Override
         public boolean recoverMessageReference(String reference) throws 
Exception {
             return delegate.recoverMessageReference(new MessageId(reference));

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index 6fe83c8..facf969 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -37,6 +37,7 @@ import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
+import org.apache.activemq.store.jdbc.JDBCMessageStore;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
 import org.apache.activemq.store.jdbc.Statements;
@@ -632,13 +633,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
-                while (rs.next() && count < maxReturned && 
listener.hasSpace()) {
+                while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
                     }
                 }
             } else {
-                while (rs.next() && count < maxReturned && 
listener.hasSpace()) {
+                while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessage(rs.getLong(1), 
getBinaryData(rs, 2))) {
                         count++;
                     }
@@ -669,13 +670,13 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
-                while (rs.next() && count < maxReturned && listener.hasSpace() 
) {
+                while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
                     }
                 }
             } else {
-                while (rs.next() && count < maxReturned  && 
listener.hasSpace()) {
+                while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessage(rs.getLong(1), 
getBinaryData(rs, 2))) {
                         count++;
                     }
@@ -1143,7 +1144,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             rs = s.executeQuery();
             int count = 0;
             if (this.statements.isUseExternalMessageReferences()) {
-                while (rs.next() && count < maxReturned && 
listener.hasSpace()) {
+                while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessageReference(rs.getString(1))) {
                         count++;
                     } else {
@@ -1152,7 +1153,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                     }
                 }
             } else {
-                while (rs.next() && count < maxReturned && 
listener.hasSpace()) {
+                while (rs.next() && count < maxReturned) {
                     if (listener.recoverMessage(rs.getLong(1), 
getBinaryData(rs, 2))) {
                         count++;
                     } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 69319a0..e1c1df4 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -585,7 +585,7 @@ public class KahaDBStore extends MessageDatabase implements 
PersistenceAdapter {
                             
msg.getMessageId().setFutureOrSequenceLong(entry.getKey());
                             listener.recoverMessage(msg);
                             counter++;
-                            if (counter >= maxReturned || listener.hasSpace() 
== false) {
+                            if (counter >= maxReturned) {
                                 break;
                             }
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
----------------------------------------------------------------------
diff --git 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
index 09fd350..b0051cc 100644
--- 
a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
+++ 
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
@@ -737,7 +737,7 @@ class DBManager(val parent:LevelDBStore) {
         lastmsgid = msg.getMessageId
         count += 1
       }
-      count < max && listener.hasSpace
+      count < max
     }
     if( lastmsgid==null ) {
       startPos

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
index 7680ca9..2406e88 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
@@ -27,7 +27,6 @@ import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -83,14 +82,10 @@ public class StoreQueueCursorNoDuplicateTest extends 
TestCase {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
         SystemUsage systemUsage = new SystemUsage();
-
-        ActiveMQTextMessage sampleMessage = getMessage(0);
-        int unitSize = sampleMessage.getSize();
-
         // ensure memory limit is reached
-        systemUsage.getMemoryUsage().setLimit(unitSize * count);
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
         underTest.setSystemUsage(systemUsage);
         underTest.setEnableAudit(false);
         underTest.start();
@@ -115,11 +110,8 @@ public class StoreQueueCursorNoDuplicateTest extends 
TestCase {
             ref.decrementReferenceCount();
             underTest.remove();
             LOG.info("Received message: {} with body: {}",
-                    ref.getMessageId(), ((ActiveMQTextMessage) 
ref.getMessage()).getText());
+                     ref.getMessageId(), 
((ActiveMQTextMessage)ref.getMessage()).getText());
             assertEquals(dequeueCount++, 
ref.getMessageId().getProducerSequenceId());
-
-            // memory store keeps a message ref that needs releasing to free 
usage
-            queueMessageStore.removeMessage(contextNotInTx, new 
MessageAck(ref.getMessage(), MessageAck.STANDARD_ACK_TYPE, 1));
         }
         underTest.release();
         assertEquals(count, dequeueCount);

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
index 92c646b..90b8428 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -89,7 +89,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -154,7 +154,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -222,7 +222,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
@@ -299,7 +299,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
@@ -392,7 +392,7 @@ public class StoreQueueCursorOrderTest {
         queueMessageStore.start();
         queueMessageStore.registerIndexListener(null);
 
-        QueueStorePrefetch underTest = new QueueStorePrefetch(queue);
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
         SystemUsage systemUsage = new SystemUsage();
         // ensure memory limit is reached
         systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
index 8f6fbb2..e65ad91 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -46,7 +46,7 @@ public class AMQ4930Test extends TestCase {
     protected void configureBroker() throws Exception {
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setAdvisorySupport(false);
-        broker.getSystemUsage().getMemoryUsage().setLimit(100*1024*1024);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024);
 
         PolicyMap pMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
deleted file mode 100644
index 5b2dc23..0000000
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitPfcTest.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.usecases;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.util.ProducerThread;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@RunWith(value = Parameterized.class)
-public class MemoryLimitPfcTest extends TestSupport {
-    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryLimitPfcTest.class);
-    final String payload = new String(new byte[100 * 1024]);
-    protected BrokerService broker;
-
-    @Parameterized.Parameter
-    public PersistenceAdapterChoice persistenceAdapterChoice;
-
-    @Parameterized.Parameters(name="store={0}")
-    public static Iterable<Object[]> getTestParameters() {
-        return Arrays.asList(new Object[][]{{PersistenceAdapterChoice.KahaDB}, 
{PersistenceAdapterChoice.LevelDB}, {PersistenceAdapterChoice.JDBC}});
-    }
-
-    protected BrokerService createBroker() throws Exception {
-        BrokerService broker = new BrokerService();
-        broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); 
//1MB
-        broker.setDeleteAllMessagesOnStartup(true);
-
-        PolicyMap policyMap = new PolicyMap();
-        PolicyEntry policyEntry = new PolicyEntry();
-        policyEntry.setExpireMessagesPeriod(0); // when this fires it will 
consume 2*pageSize mem which will throw the test
-        policyMap.put(new ActiveMQQueue(">"), policyEntry);
-        broker.setDestinationPolicy(policyMap);
-
-        LOG.info("Starting broker with persistenceAdapterChoice " + 
persistenceAdapterChoice.toString());
-        setPersistenceAdapter(broker, persistenceAdapterChoice);
-
-        return broker;
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        if (broker == null) {
-            broker = createBroker();
-        }
-        broker.start();
-        broker.waitUntilStarted();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        if (broker != null) {
-            broker.stop();
-            broker.waitUntilStopped();
-        }
-    }
-
-
-    @Test(timeout = 120000)
-    public void testStopCachingDispatchNoPfc() throws Exception {
-
-        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
-        factory.setOptimizeAcknowledge(true);
-        Connection conn = factory.createConnection();
-        conn.start();
-        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Queue queue = sess.createQueue("STORE");
-        final ProducerThread producer = new ProducerThread(sess, queue) {
-            @Override
-            protected Message createMessage(int i) throws Exception {
-                BytesMessage bytesMessage = session.createBytesMessage();
-                bytesMessage.writeBytes(payload.getBytes());
-                return bytesMessage;
-            }
-        };
-        producer.setMessageCount(200);
-        producer.start();
-        producer.join();
-
-        Thread.sleep(1000);
-
-        // assert we didn't break high watermark (70%) usage
-        final Destination dest = broker.getDestination((ActiveMQQueue) queue);
-        LOG.info("Destination usage: " + dest.getMemoryUsage());
-        int percentUsage = dest.getMemoryUsage().getPercentUsage();
-        assertTrue("Should be less than 70% of limit but was: " + 
percentUsage, percentUsage <= 80);
-        LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
-        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() 
<= 80);
-
-        assertFalse("cache disabled", 
((org.apache.activemq.broker.region.Queue) 
dest).getMessages().isCacheEnabled());
-
-        // consume one message
-        MessageConsumer consumer = sess.createConsumer(queue);
-        Message msg = consumer.receive(5000);
-        msg.acknowledge();
-
-        LOG.info("Destination usage after consume one: " + 
dest.getMemoryUsage());
-
-        // ensure we can send more messages
-        final ProducerThread secondProducer = new ProducerThread(sess, queue) {
-                    @Override
-                    protected Message createMessage(int i) throws Exception {
-                        BytesMessage bytesMessage = 
session.createBytesMessage();
-                        bytesMessage.writeBytes(payload.getBytes());
-                        return bytesMessage;
-                    }
-                };
-        secondProducer.setMessageCount(100);
-        secondProducer.start();
-        secondProducer.join();
-
-        LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
-        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() 
<= 100);
-
-        // let's make sure we can consume all messages
-        for (int i = 1; i < 300; i++) {
-            msg = consumer.receive(5000);
-            if (msg == null) {
-                dumpAllThreads("NoMessage");
-            }
-            assertNotNull("Didn't receive message " + i, msg);
-            msg.acknowledge();
-        }
-    }
-
-    @Test(timeout = 120000)
-    public void testConsumeFromTwoAfterPageInToOne() throws Exception {
-
-        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=10");
-        factory.setOptimizeAcknowledge(true);
-        Connection conn = factory.createConnection();
-        conn.start();
-        Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        final ProducerThread producer = new ProducerThread(sess, 
sess.createQueue("STORE.1")) {
-            @Override
-            protected Message createMessage(int i) throws Exception {
-                return session.createTextMessage(payload + "::" + i);
-            }
-        };
-        producer.setMessageCount(20);
-
-        final ProducerThread producer2 = new ProducerThread(sess, 
sess.createQueue("STORE.2")) {
-            @Override
-            protected Message createMessage(int i) throws Exception {
-                return session.createTextMessage(payload + "::" + i);
-            }
-        };
-        producer2.setMessageCount(20);
-
-        producer.start();
-        producer2.start();
-
-        producer.join();
-        producer2.join();
-
-        LOG.info("before consumer1, broker % mem usage: " + 
broker.getSystemUsage().getMemoryUsage().getPercentUsage());
-
-        MessageConsumer consumer = 
sess.createConsumer(sess.createQueue("STORE.1"));
-        Message msg = null;
-        for (int i=0; i<10; i++) {
-            msg = consumer.receive(5000);
-            LOG.info("% mem usage: " + 
broker.getSystemUsage().getMemoryUsage().getPercentUsage());
-            msg.acknowledge();
-        }
-
-        TimeUnit.SECONDS.sleep(2);
-        LOG.info("Before consumer2, Broker % mem usage: " + 
broker.getSystemUsage().getMemoryUsage().getPercentUsage());
-
-        MessageConsumer consumer2 = 
sess.createConsumer(sess.createQueue("STORE.2"));
-        for (int i=0; i<10; i++) {
-            msg = consumer2.receive(5000);
-            LOG.info("% mem usage: " + 
broker.getSystemUsage().getMemoryUsage().getPercentUsage());
-            msg.acknowledge();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
index d3af604..760876c 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/MemoryLimitTest.java
@@ -133,9 +133,18 @@ public class MemoryLimitTest extends TestSupport {
         Message msg = consumer.receive(5000);
         msg.acknowledge();
 
-        assertTrue("Should be less than 70% of limit but was: " + 
percentUsage, percentUsage <= 71);
+        // this should free some space and allow us to get new batch of 
messages in the memory
+        // exceeding the limit
+        assertTrue("Limit is exceeded", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Destination usage: " + dest.getMemoryUsage());
+                return dest.getMemoryUsage().getPercentUsage() >= 200;
+            }
+        }));
+
         LOG.info("Broker usage: " + broker.getSystemUsage().getMemoryUsage());
-        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() 
<= 71);
+        assertTrue(broker.getSystemUsage().getMemoryUsage().getPercentUsage() 
>= 200);
 
         // let's make sure we can consume all messages
         for (int i = 1; i < 2000; i++) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/13ec9949/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
index 05540a5..29b6e72 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
@@ -182,7 +182,7 @@ public class QueueBrowsingTest {
 
     @Test
     public void testMemoryLimit() throws Exception {
-        broker.getSystemUsage().getMemoryUsage().setLimit((maxPageSize + 10) * 
4 * 1024);
+        broker.getSystemUsage().getMemoryUsage().setLimit(16 * 1024);
 
         int messageToSend = 370;
 
@@ -211,6 +211,6 @@ public class QueueBrowsingTest {
         }
 
         browser.close();
-        assertTrue("got at least maxPageSize, received: " + received, received 
>= maxPageSize);
+        assertTrue("got at least maxPageSize", received >= maxPageSize);
     }
 }

Reply via email to