https://issues.apache.org/jira/browse/AMQ-5923

Adding metrics to track the pending message size for a queue and for
subscribers.  This is useful so that not only the pending count is
known but also the total message size left to consume. Also improving
the message size store tests as well.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/734fb7dd
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/734fb7dd
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/734fb7dd

Branch: refs/heads/master
Commit: 734fb7dda35285ada7bc57642215077e08c88e80
Parents: b17cc37
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Fri Aug 21 18:58:08 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Sep 9 18:12:15 2015 +0000

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |   7 +
 .../apache/activemq/broker/region/Queue.java    |  13 +
 .../activemq/broker/region/Subscription.java    |   5 +
 .../broker/region/TopicSubscription.java        |   7 +
 .../region/cursors/AbstractStoreCursor.java     |  27 +
 .../cursors/FilePendingMessageCursor.java       |  21 +-
 .../region/cursors/OrderedPendingList.java      |  35 +-
 .../broker/region/cursors/PendingList.java      |   3 +
 .../region/cursors/PendingMessageCursor.java    |  66 +--
 .../region/cursors/PendingMessageHelper.java    |  68 +++
 .../region/cursors/PrioritizedPendingList.java  |  31 +-
 .../cursors/QueueDispatchPendingList.java       |   5 +
 .../region/cursors/QueueStorePrefetch.java      |  33 +-
 .../cursors/StoreDurableSubscriberCursor.java   |   9 +
 .../broker/region/cursors/StoreQueueCursor.java |  25 +
 .../region/cursors/TopicStorePrefetch.java      |  24 +-
 .../region/cursors/VMPendingMessageCursor.java  |  60 +-
 .../java/org/apache/activemq/store/PList.java   |   2 +
 .../activemq/store/ProxyTopicMessageStore.java  |  10 +
 .../activemq/store/TopicMessageStore.java       |   2 +
 .../store/memory/MemoryTopicMessageStore.java   |  10 +
 .../activemq/store/memory/MemoryTopicSub.java   |  23 +-
 .../store/jdbc/JDBCTopicMessageStore.java       |  31 +-
 .../store/journal/JournalTopicMessageStore.java |  24 +-
 .../activemq/store/kahadb/KahaDBStore.java      |  24 +
 .../activemq/store/kahadb/MessageDatabase.java  |  26 +
 .../activemq/store/kahadb/TempKahaDBStore.java  |   5 +
 .../store/kahadb/disk/index/ListIndex.java      |  20 +-
 .../store/kahadb/disk/index/ListNode.java       |  13 +-
 .../activemq/store/kahadb/plist/PListImpl.java  |  67 ++-
 .../apache/activemq/leveldb/LevelDBStore.scala  |   6 +
 .../region/QueueDuplicatesFromStoreTest.java    |   7 +
 .../region/SubscriptionAddRemoveQueueTest.java  |  42 ++
 .../AbstractPendingMessageCursorTest.java       | 547 +++++++++++++++++++
 .../cursors/KahaDBPendingMessageCursorTest.java | 126 +++++
 .../cursors/MemoryPendingMessageCursorTest.java | 145 +++++
 .../MultiKahaDBPendingMessageCursorTest.java    |  60 ++
 .../region/cursors/OrderPendingListTest.java    |  10 +
 .../store/AbstractMessageStoreSizeStatTest.java | 244 ++-------
 .../store/AbstractStoreStatTestSupport.java     | 268 +++++++++
 .../kahadb/KahaDBMessageStoreSizeStatTest.java  |  22 +-
 .../MultiKahaDBMessageStoreSizeStatTest.java    |  50 +-
 .../memory/MemoryMessageStoreSizeStatTest.java  |  22 +-
 43 files changed, 1919 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index ef1b372..4e688cc 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -582,6 +582,13 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
     }
 
     @Override
+    public long getPendingMessageSize() {
+        synchronized (pendingLock) {
+            return pending.messageSize();
+        }
+    }
+
+    @Override
     public int getDispatchedQueueSize() {
         return dispatched.size();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c9823e1..b0b609b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -927,6 +927,19 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         return msg;
     }
 
+    public long getPendingMessageSize() {
+        messagesLock.readLock().lock();
+        try{
+            return messages.messageSize();
+        } finally {
+            messagesLock.readLock().unlock();
+        }
+    }
+
+    public long getPendingMessageCount() {
+         return this.destinationStatistics.getMessages().getCount();
+    }
+
     @Override
     public String toString() {
         return destination.getQualifiedName() + ", subscriptions=" + 
consumers.size()

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
index 9452b99..4a8b341 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
@@ -119,6 +119,11 @@ public interface Subscription extends SubscriptionRecovery 
{
     int getPendingQueueSize();
 
     /**
+     * @return size of the messages pending delivery
+     */
+    long getPendingMessageSize();
+
+    /**
      * @return number of messages dispatched to the client
      */
     int getDispatchedQueueSize();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index d3e683d..e1c8a95 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -419,6 +419,13 @@ public class TopicSubscription extends 
AbstractSubscription {
     }
 
     @Override
+    public long getPendingMessageSize() {
+        synchronized (matchedListMutex) {
+            return matched.messageSize();
+        }
+    }
+
+    @Override
     public int getDispatchedQueueSize() {
         return (int)(getSubscriptionStatistics().getDispatched().getCount() -
                 prefetchExtension.get() - 
getSubscriptionStatistics().getDequeues().getCount());

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 4bdd7f6..05e4b1f 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
@@ -49,6 +50,8 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     final MessageId[] lastCachedIds = new MessageId[2];
     protected boolean hadSpace = false;
 
+
+
     protected AbstractStoreCursor(Destination destination) {
         super((destination != null ? 
destination.isPrioritizedMessages():false));
         this.regionDestination=destination;
@@ -60,6 +63,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void start() throws Exception{
         if (!isStarted()) {
             super.start();
@@ -78,6 +82,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
         resetSize();
     }
 
+    @Override
     public final synchronized void stop() throws Exception {
         resetBatch();
         super.stop();
@@ -85,6 +90,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final boolean recoverMessage(Message message) throws Exception {
         return recoverMessage(message,false);
     }
@@ -136,6 +142,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
         duplicatesFromStore.clear();
     }
 
+    @Override
     public final synchronized void reset() {
         if (batchList.isEmpty()) {
             try {
@@ -150,6 +157,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public synchronized void release() {
         clearIterator(false);
     }
@@ -173,6 +181,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized boolean hasNext() {
         if (batchList.isEmpty()) {
             try {
@@ -187,6 +196,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized MessageReference next() {
         MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
@@ -199,6 +209,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
         return result;
     }
 
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) throws 
Exception {
         boolean disableCache = false;
         if (hasSpace()) {
@@ -333,12 +344,14 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws 
Exception {
         setCacheEnabled(false);
         size++;
     }
 
 
+    @Override
     public final synchronized void remove() {
         size--;
         if (iterator!=null) {
@@ -350,6 +363,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void remove(MessageReference node) {
         if (batchList.remove(node) != null) {
             size--;
@@ -358,11 +372,13 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized void clear() {
         gc();
     }
 
 
+    @Override
     public synchronized void gc() {
         for (MessageReference msg : batchList) {
             rollback(msg.getMessageId());
@@ -374,6 +390,7 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
         setCacheEnabled(false);
     }
 
+    @Override
     protected final synchronized void fillBatch() {
         if (LOG.isTraceEnabled()) {
             LOG.trace("{} fillBatch", this);
@@ -395,17 +412,20 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
 
+    @Override
     public final synchronized boolean isEmpty() {
         // negative means more messages added to store through queue.send 
since last reset
         return size == 0;
     }
 
 
+    @Override
     public final synchronized boolean hasMessagesBufferedToDeliver() {
         return !batchList.isEmpty();
     }
 
 
+    @Override
     public final synchronized int size() {
         if (size < 0) {
             this.size = getStoreSize();
@@ -414,6 +434,11 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
     }
 
     @Override
+    public final synchronized long messageSize() {
+        return getStoreMessageSize();
+    }
+
+    @Override
     public String toString() {
         return super.toString() + ":" + 
regionDestination.getActiveMQDestination().getPhysicalName() + 
",batchResetNeeded=" + batchResetNeeded
                     + ",size=" + this.size + ",cacheEnabled=" + 
isCacheEnabled()
@@ -428,6 +453,8 @@ public abstract class AbstractStoreCursor extends 
AbstractPendingMessageCursor i
 
     protected abstract int getStoreSize();
 
+    protected abstract long getStoreMessageSize();
+
     protected abstract boolean isStoreEmpty();
 
     public Subscription getSubscription() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 7512e39..3f3f33b 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -44,8 +44,8 @@ import org.apache.activemq.util.ByteSequence;
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 public class FilePendingMessageCursor extends AbstractPendingMessageCursor 
implements UsageListener {
     static final Logger LOG = 
LoggerFactory.getLogger(FilePendingMessageCursor.class);
@@ -198,15 +198,15 @@ public class FilePendingMessageCursor extends 
AbstractPendingMessageCursor imple
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
-     * @throws Exception 
+     * @throws Exception
      */
     @Override
     public synchronized boolean addMessageLast(MessageReference node) throws 
Exception {
         return tryAddMessageLast(node, 0);
     }
-    
+
     @Override
     public synchronized boolean tryAddMessageLast(MessageReference node, long 
maxWaitTime) throws Exception {
         if (!node.isExpired()) {
@@ -252,7 +252,7 @@ public class FilePendingMessageCursor extends 
AbstractPendingMessageCursor imple
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
     @Override
@@ -356,6 +356,11 @@ public class FilePendingMessageCursor extends 
AbstractPendingMessageCursor imple
         return memoryList.size() + (isDiskListEmpty() ? 0 : 
(int)getDiskList().size());
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return memoryList.messageSize() + (isDiskListEmpty() ? 0 : 
(int)getDiskList().messageSize());
+    }
+
     /**
      * clear all pending messages
      */
@@ -389,6 +394,7 @@ public class FilePendingMessageCursor extends 
AbstractPendingMessageCursor imple
         super.setSystemUsage(usageManager);
     }
 
+    @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int 
newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
             synchronized (this) {
@@ -497,10 +503,12 @@ public class FilePendingMessageCursor extends 
AbstractPendingMessageCursor imple
             }
         }
 
+        @Override
         public boolean hasNext() {
             return iterator.hasNext();
         }
 
+        @Override
         public MessageReference next() {
             try {
                 PListEntry entry = iterator.next();
@@ -513,6 +521,7 @@ public class FilePendingMessageCursor extends 
AbstractPendingMessageCursor imple
             }
         }
 
+        @Override
         public void remove() {
             iterator.remove();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
index 9bf9588..31870b1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
@@ -25,13 +25,23 @@ import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
 
 public class OrderedPendingList implements PendingList {
 
     private PendingNode root = null;
     private PendingNode tail = null;
     private final Map<MessageId, PendingNode> map = new HashMap<MessageId, 
PendingNode>();
+    private final SizeStatisticImpl messageSize;
+    private final PendingMessageHelper pendingMessageHelper;
 
+    public OrderedPendingList() {
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes 
of the pending messages");
+        messageSize.setEnabled(true);
+        pendingMessageHelper = new PendingMessageHelper(map, messageSize);
+    }
+
+    @Override
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = new PendingNode(this, message);
         if (root == null) {
@@ -41,10 +51,11 @@ public class OrderedPendingList implements PendingList {
             root.linkBefore(node);
             root = node;
         }
-        this.map.put(message.getMessageId(), node);
+        pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public PendingNode addMessageLast(MessageReference message) {
         PendingNode node = new PendingNode(this, message);
         if (root == null) {
@@ -53,29 +64,35 @@ public class OrderedPendingList implements PendingList {
             tail.linkAfter(node);
         }
         tail = node;
-        this.map.put(message.getMessageId(), node);
+        pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public void clear() {
         this.root = null;
         this.tail = null;
         this.map.clear();
+        this.messageSize.reset();
     }
 
+    @Override
     public boolean isEmpty() {
         return this.map.isEmpty();
     }
 
+    @Override
     public Iterator<MessageReference> iterator() {
         return new Iterator<MessageReference>() {
             private PendingNode current = null;
             private PendingNode next = root;
 
+            @Override
             public boolean hasNext() {
                 return next != null;
             }
 
+            @Override
             public MessageReference next() {
                 MessageReference result = null;
                 this.current = this.next;
@@ -84,31 +101,39 @@ public class OrderedPendingList implements PendingList {
                 return result;
             }
 
+            @Override
             public void remove() {
                 if (this.current != null && this.current.getMessage() != null) 
{
-                    map.remove(this.current.getMessage().getMessageId());
+                    
pendingMessageHelper.removeFromMap(this.current.getMessage());
                 }
                 removeNode(this.current);
             }
         };
     }
 
+    @Override
     public PendingNode remove(MessageReference message) {
         PendingNode node = null;
         if (message != null) {
-            node = this.map.remove(message.getMessageId());
+            node = pendingMessageHelper.removeFromMap(message);
             removeNode(node);
         }
         return node;
     }
 
+    @Override
     public int size() {
         return this.map.size();
     }
 
+    @Override
+    public long messageSize() {
+        return this.messageSize.getTotalSize();
+    }
+
     void removeNode(PendingNode node) {
         if (node != null) {
-            map.remove(node.getMessage().getMessageId());
+            pendingMessageHelper.removeFromMap(node.getMessage());
             if (root == node) {
                 root = (PendingNode) node.getNext();
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
index 153d8bd..adfa78e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
@@ -73,6 +73,8 @@ public interface PendingList extends 
Iterable<MessageReference> {
      */
     public int size();
 
+    public long messageSize();
+
     /**
      * Returns an iterator over the pending Messages.  The subclass controls 
how
      * the returned iterator actually traverses the list of pending messages 
allowing
@@ -81,6 +83,7 @@ public interface PendingList extends 
Iterable<MessageReference> {
      *
      * @return an Iterator that returns MessageReferences contained in this 
list.
      */
+    @Override
     public Iterator<MessageReference> iterator();
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index 06d59f1..bf7fd7a 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -30,14 +30,14 @@ import org.apache.activemq.usage.SystemUsage;
 /**
  * Interface to pending message (messages awaiting disptach to a consumer)
  * cursor
- * 
- * 
+ *
+ *
  */
 public interface PendingMessageCursor extends Service {
 
     /**
      * Add a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -46,7 +46,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * remove a destination
-     * 
+     *
      * @param context
      * @param destination
      * @throws Exception
@@ -60,7 +60,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * check if a Destination is Empty for this cursor
-     * 
+     *
      * @param destination
      * @return true id the Destination is empty
      */
@@ -79,7 +79,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      * @return boolean true if successful, false if cursor traps a duplicate
      * @throws IOException
@@ -89,9 +89,9 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch - if it can
-     * 
+     *
      * @param node
-     * @param maxWaitTime 
+     * @param maxWaitTime
      * @return true if successful
      * @throws IOException
      * @throws Exception
@@ -100,7 +100,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      * @throws Exception
      */
@@ -108,7 +108,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Add a message recovered from a retroactive policy
-     * 
+     *
      * @param node
      * @throws Exception
      */
@@ -134,6 +134,8 @@ public interface PendingMessageCursor extends Service {
      */
     int size();
 
+    long messageSize();
+
     /**
      * clear all pending messages
      */
@@ -142,7 +144,7 @@ public interface PendingMessageCursor extends Service {
     /**
      * Informs the Broker if the subscription needs to intervention to recover
      * it's state e.g. DurableTopicSubscriber may do
-     * 
+     *
      * @return true if recovery required
      */
     boolean isRecoveryRequired();
@@ -154,7 +156,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Set the max batch size
-     * 
+     *
      * @param maxBatchSize
      */
     void setMaxBatchSize(int maxBatchSize);
@@ -167,7 +169,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * remove a node
-     * 
+     *
      * @param node
      */
     void remove(MessageReference node);
@@ -179,7 +181,7 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * Set the UsageManager
-     * 
+     *
      * @param systemUsage
      * @see org.apache.activemq.usage.SystemUsage
      */
@@ -204,7 +206,7 @@ public interface PendingMessageCursor extends Service {
      * @return true if the cursor is full
      */
     boolean isFull();
-    
+
     /**
      * @return true if the cursor has space to page messages into
      */
@@ -217,41 +219,41 @@ public interface PendingMessageCursor extends Service {
 
     /**
      * destroy the cursor
-     * 
+     *
      * @throws Exception
      */
     void destroy() throws Exception;
 
     /**
      * Page in a restricted number of messages and increment the reference 
count
-     * 
+     *
      * @param maxItems
      * @return a list of paged in messages
      */
     LinkedList<MessageReference> pageInList(int maxItems);
-    
+
     /**
      * set the maximum number of producers to track at one time
      * @param value
      */
     void setMaxProducersToAudit(int value);
-    
+
     /**
      * @return the maximum number of producers to audit
      */
     int getMaxProducersToAudit();
-    
+
     /**
      * Set the maximum depth of message ids to track
-     * @param depth 
+     * @param depth
      */
     void setMaxAuditDepth(int depth);
-    
+
     /**
      * @return the audit depth
      */
     int getMaxAuditDepth();
-    
+
     /**
      * @return the enableAudit
      */
@@ -260,37 +262,37 @@ public interface PendingMessageCursor extends Service {
      * @param enableAudit the enableAudit to set
      */
     public void setEnableAudit(boolean enableAudit);
-    
+
     /**
-     * @return true if the underlying state of this cursor 
+     * @return true if the underlying state of this cursor
      * disappears when the broker shuts down
      */
     public boolean isTransient();
-    
-    
+
+
     /**
      * set the audit
      * @param audit
      */
     public void setMessageAudit(ActiveMQMessageAudit audit);
-    
-    
+
+
     /**
      * @return the audit - could be null
      */
     public ActiveMQMessageAudit getMessageAudit();
-    
+
     /**
      * use a cache to improve performance
      * @param useCache
      */
     public void setUseCache(boolean useCache);
-    
+
     /**
      * @return true if a cache may be used
      */
     public boolean isUseCache();
-    
+
     /**
      * remove from auditing the message id
      * @param id

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
new file mode 100644
index 0000000..f28d61b
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageHelper.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.Map;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
+
+/**
+ *
+ *
+ */
+public class PendingMessageHelper {
+
+    private final Map<MessageId, PendingNode> map;
+    private final SizeStatisticImpl messageSize;
+
+    public PendingMessageHelper(Map<MessageId, PendingNode> map,
+            SizeStatisticImpl messageSize) {
+        super();
+        this.map = map;
+        this.messageSize = messageSize;
+    }
+
+    public void addToMap(MessageReference message, PendingNode node) {
+        PendingNode previous = this.map.put(message.getMessageId(), node);
+        if (previous != null) {
+            try {
+                messageSize.addSize(-previous.getMessage().getSize());
+            } catch (Exception e) {
+              //expected for NullMessageReference
+            }
+        }
+        try {
+            messageSize.addSize(message.getSize());
+        } catch (Exception e) {
+          //expected for NullMessageReference
+        }
+    }
+
+    public PendingNode removeFromMap(MessageReference message) {
+        PendingNode removed = this.map.remove(message.getMessageId());
+        if (removed != null) {
+            try {
+                messageSize.addSize(-removed.getMessage().getSize());
+            } catch (Exception e) {
+                //expected for NullMessageReference
+            }
+        }
+        return removed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index 9235b2c..70eaa53 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -25,50 +25,64 @@ import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.management.SizeStatisticImpl;
 
 public class PrioritizedPendingList implements PendingList {
 
     private static final Integer MAX_PRIORITY = 10;
     private final OrderedPendingList[] lists = new 
OrderedPendingList[MAX_PRIORITY];
     private final Map<MessageId, PendingNode> map = new HashMap<MessageId, 
PendingNode>();
+    private final SizeStatisticImpl messageSize;
+    private final PendingMessageHelper pendingMessageHelper;
+
 
     public PrioritizedPendingList() {
         for (int i = 0; i < MAX_PRIORITY; i++) {
             this.lists[i] = new OrderedPendingList();
         }
+        messageSize = new SizeStatisticImpl("messageSize", "The size in bytes 
of the pending messages");
+        messageSize.setEnabled(true);
+        pendingMessageHelper = new PendingMessageHelper(map, messageSize);
     }
 
+    @Override
     public PendingNode addMessageFirst(MessageReference message) {
         PendingNode node = getList(message).addMessageFirst(message);
-        this.map.put(message.getMessageId(), node);
+        this.pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public PendingNode addMessageLast(MessageReference message) {
         PendingNode node = getList(message).addMessageLast(message);
-        this.map.put(message.getMessageId(), node);
+        this.pendingMessageHelper.addToMap(message, node);
         return node;
     }
 
+    @Override
     public void clear() {
         for (int i = 0; i < MAX_PRIORITY; i++) {
             this.lists[i].clear();
         }
         this.map.clear();
+        this.messageSize.reset();
     }
 
+    @Override
     public boolean isEmpty() {
         return this.map.isEmpty();
     }
 
+    @Override
     public Iterator<MessageReference> iterator() {
         return new PrioritizedPendingListIterator();
     }
 
+    @Override
     public PendingNode remove(MessageReference message) {
         PendingNode node = null;
         if (message != null) {
-            node = this.map.remove(message.getMessageId());
+            node = this.pendingMessageHelper.removeFromMap(message);
             if (node != null) {
                 node.getList().removeNode(node);
             }
@@ -76,11 +90,17 @@ public class PrioritizedPendingList implements PendingList {
         return node;
     }
 
+    @Override
     public int size() {
         return this.map.size();
     }
 
     @Override
+    public long messageSize() {
+        return this.messageSize.getTotalSize();
+    }
+
+    @Override
     public String toString() {
         return "PrioritizedPendingList(" + System.identityHashCode(this) + ")";
     }
@@ -111,10 +131,12 @@ public class PrioritizedPendingList implements 
PendingList {
                 }
             }
         }
+        @Override
         public boolean hasNext() {
             return list.size() > index;
         }
 
+        @Override
         public MessageReference next() {
             PendingNode node = list.get(this.index);
             this.currentIndex = this.index;
@@ -122,10 +144,11 @@ public class PrioritizedPendingList implements 
PendingList {
             return node.getMessage();
         }
 
+        @Override
         public void remove() {
             PendingNode node = list.get(this.currentIndex);
             if (node != null) {
-                map.remove(node.getMessage().getMessageId());
+                pendingMessageHelper.removeFromMap(node.getMessage());
                 node.getList().removeNode(node);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index 380569e..cdddd4c 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -97,6 +97,11 @@ public class QueueDispatchPendingList implements PendingList 
{
     }
 
     @Override
+    public long messageSize() {
+        return pagedInPendingDispatch.messageSize() + 
redeliveredWaitingDispatch.messageSize();
+    }
+
+    @Override
     public Iterator<MessageReference> iterator() {
         return new Iterator<MessageReference>() {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 9fb73c5..b10b2e2 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -32,14 +32,14 @@ import org.slf4j.LoggerFactory;
 /**
  * persist pending messages pending message (messages awaiting dispatch to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 class QueueStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = 
LoggerFactory.getLogger(QueueStorePrefetch.class);
     private final MessageStore store;
     private final Broker broker;
-   
+
     /**
      * Construct it
      * @param queue
@@ -51,6 +51,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
 
     }
 
+    @Override
     public boolean recoverMessageReference(MessageId messageReference) throws 
Exception {
         Message msg = this.store.getMessage(messageReference);
         if (msg != null) {
@@ -62,36 +63,46 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         }
     }
 
-   
-        
+
+
     @Override
     protected synchronized int getStoreSize() {
         try {
             int result = this.store.getMessageCount();
             return result;
-            
+
         } catch (IOException e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
-    
+
+    @Override
+    protected synchronized long getStoreMessageSize() {
+        try {
+            return this.store.getMessageSize();
+        } catch (IOException e) {
+            LOG.error("Failed to get message size", e);
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
             return this.store.isEmpty();
-            
+
         } catch (Exception e) {
             LOG.error("Failed to get message count", e);
             throw new RuntimeException(e);
         }
     }
-    
+
     @Override
     protected void resetBatch() {
         this.store.resetBatching();
     }
-    
+
     @Override
     protected void setBatch(MessageId messageId) throws Exception {
         if (LOG.isTraceEnabled()) {
@@ -101,7 +112,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
         batchResetNeeded = false;
     }
 
-    
+
     @Override
     protected void doFillBatch() throws Exception {
         hadSpace = this.hasSpace();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 32000f5..9d723b8 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -303,6 +303,15 @@ public class StoreDurableSubscriberCursor extends 
AbstractPendingMessageCursor {
     }
 
     @Override
+    public synchronized long messageSize() {
+        long pendingSize=0;
+        for (PendingMessageCursor tsp : storePrefetches) {
+            pendingSize += tsp.messageSize();
+        }
+        return pendingSize;
+    }
+
+    @Override
     public void setMaxBatchSize(int newMaxBatchSize) {
         for (PendingMessageCursor storePrefetch : storePrefetches) {
             storePrefetch.setMaxBatchSize(newMaxBatchSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
index 5b072a6..caa93b6 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
@@ -51,6 +51,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         currentCursor = persistent;
     }
 
+    @Override
     public synchronized void start() throws Exception {
         started = true;
         super.start();
@@ -73,6 +74,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public synchronized void stop() throws Exception {
         started = false;
         if (nonPersistent != null) {
@@ -87,6 +89,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         pendingCount = 0;
     }
 
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) throws 
Exception {
         boolean result = true;
         if (node != null) {
@@ -104,6 +107,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         return result;
     }
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws 
Exception {
         if (node != null) {
             Message msg = node.getMessage();
@@ -119,10 +123,12 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public synchronized void clear() {
         pendingCount = 0;
     }
 
+    @Override
     public synchronized boolean hasNext() {
         try {
             getNextCursor();
@@ -133,11 +139,13 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
        return currentCursor != null ? currentCursor.hasNext() : false;
     }
 
+    @Override
     public synchronized MessageReference next() {
         MessageReference result = currentCursor != null ? currentCursor.next() 
: null;
         return result;
     }
 
+    @Override
     public synchronized void remove() {
         if (currentCursor != null) {
             currentCursor.remove();
@@ -145,6 +153,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         pendingCount--;
     }
 
+    @Override
     public synchronized void remove(MessageReference node) {
         if (!node.isPersistent()) {
             nonPersistent.remove(node);
@@ -154,18 +163,21 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         pendingCount--;
     }
 
+    @Override
     public synchronized void reset() {
         nonPersistent.reset();
         persistent.reset();
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public void release() {
         nonPersistent.release();
         persistent.release();
     }
 
 
+    @Override
     public synchronized int size() {
         if (pendingCount < 0) {
             pendingCount = persistent.size() + nonPersistent.size();
@@ -173,6 +185,12 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         return pendingCount;
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return persistent.messageSize() + nonPersistent.messageSize();
+    }
+
+    @Override
     public synchronized boolean isEmpty() {
         // if negative, more messages arrived in store since last reset so non 
empty
         return pendingCount == 0;
@@ -185,6 +203,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
      * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
      * @return true if recovery required
      */
+    @Override
     public boolean isRecoveryRequired() {
         return false;
     }
@@ -203,6 +222,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         this.nonPersistent = nonPersistent;
     }
 
+    @Override
     public void setMaxBatchSize(int maxBatchSize) {
         persistent.setMaxBatchSize(maxBatchSize);
         if (nonPersistent != null) {
@@ -212,6 +232,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
     }
 
 
+    @Override
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         super.setMaxProducersToAudit(maxProducersToAudit);
         if (persistent != null) {
@@ -222,6 +243,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public void setMaxAuditDepth(int maxAuditDepth) {
         super.setMaxAuditDepth(maxAuditDepth);
         if (persistent != null) {
@@ -232,6 +254,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         }
     }
 
+    @Override
     public void setEnableAudit(boolean enableAudit) {
         super.setEnableAudit(enableAudit);
         if (persistent != null) {
@@ -266,6 +289,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
 
 
 
+    @Override
     public synchronized void gc() {
         if (persistent != null) {
             persistent.gc();
@@ -276,6 +300,7 @@ public class StoreQueueCursor extends 
AbstractPendingMessageCursor {
         pendingCount = persistent.size() + nonPersistent.size();
     }
 
+    @Override
     public void setSystemUsage(SystemUsage usageManager) {
         super.setSystemUsage(usageManager);
         if (persistent != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 811531e..c3f788f 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
 /**
  * persist pendingCount messages pendingCount message (messages awaiting 
disptach
  * to a consumer) cursor
- * 
- * 
+ *
+ *
  */
 class TopicStorePrefetch extends AbstractStoreCursor {
     private static final Logger LOG = 
LoggerFactory.getLogger(TopicStorePrefetch.class);
@@ -59,14 +59,17 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         this.storeHasMessages=this.size > 0;
     }
 
+    @Override
     public boolean recoverMessageReference(MessageId messageReference) throws 
Exception {
         // shouldn't get called
         throw new RuntimeException("Not supported");
     }
 
+    @Override
     public synchronized void addMessageFirst(MessageReference node) throws 
Exception {
         batchList.addMessageFirst(node);
         size++;
+        //this.messageSize.addSize(node.getMessage().getSize());
     }
 
     @Override
@@ -88,7 +91,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             }
             storeHasMessages = true;
         }
-        return recovered;      
+        return recovered;
     }
 
     @Override
@@ -100,7 +103,18 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             throw new RuntimeException(e);
         }
     }
-    
+
+
+    @Override
+    protected synchronized long getStoreMessageSize() {
+        try {
+            return store.getMessageSize(clientId, subscriberName);
+        } catch (Exception e) {
+            LOG.error("{} Failed to get the outstanding message count from the 
store", this, e);
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     protected synchronized boolean isStoreEmpty() {
         try {
@@ -111,7 +125,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         }
     }
 
-            
+
     @Override
     protected void resetBatch() {
         this.store.resetBatching(clientId, subscriberName);

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
index 15c61df..75be766 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -28,13 +29,13 @@ import 
org.apache.activemq.broker.region.QueueMessageReference;
 /**
  * hold pending messages in a linked list (messages awaiting disptach to a
  * consumer) cursor
- * 
- * 
+ *
+ *
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     private final PendingList list;
     private Iterator<MessageReference> iter;
-    
+
     public VMPendingMessageCursor(boolean prioritizedMessages) {
         super(prioritizedMessages);
         if (this.prioritizedMessages) {
@@ -44,7 +45,8 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
         }
     }
 
-    
+
+    @Override
     public synchronized List<MessageReference> remove(ConnectionContext 
context, Destination destination)
             throws Exception {
         List<MessageReference> rc = new ArrayList<MessageReference>();
@@ -62,7 +64,8 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
     /**
      * @return true if there are no pending messages
      */
-    
+
+    @Override
     public synchronized boolean isEmpty() {
         if (list.isEmpty()) {
             return true;
@@ -85,7 +88,8 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
     /**
      * reset the cursor
      */
-    
+
+    @Override
     public synchronized void reset() {
         iter = list.iterator();
         last = null;
@@ -93,10 +97,11 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
-    
+
+    @Override
     public synchronized boolean addMessageLast(MessageReference node) {
         node.incrementReferenceCount();
         list.addMessageLast(node);
@@ -105,10 +110,11 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
 
     /**
      * add message to await dispatch
-     * 
+     *
      * @param node
      */
-    
+
+    @Override
     public synchronized void addMessageFirst(MessageReference node) {
         node.incrementReferenceCount();
         list.addMessageFirst(node);
@@ -117,7 +123,8 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
     /**
      * @return true if there pending messages to dispatch
      */
-    
+
+    @Override
     public synchronized boolean hasNext() {
         return iter.hasNext();
     }
@@ -125,7 +132,8 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
     /**
      * @return the next pending message
      */
-    
+
+    @Override
     public synchronized MessageReference next() {
         last = iter.next();
         if (last != null) {
@@ -137,7 +145,8 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
     /**
      * remove the message at the cursor position
      */
-    
+
+    @Override
     public synchronized void remove() {
         if (last != null) {
             last.decrementReferenceCount();
@@ -148,15 +157,22 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
     /**
      * @return the number of pending messages
      */
-    
+
+    @Override
     public synchronized int size() {
         return list.size();
     }
 
+    @Override
+    public synchronized long messageSize() {
+        return list.messageSize();
+    }
+
     /**
      * clear all pending messages
      */
-    
+
+    @Override
     public synchronized void clear() {
         for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
             MessageReference ref = i.next();
@@ -165,7 +181,8 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
         list.clear();
     }
 
-    
+
+    @Override
     public synchronized void remove(MessageReference node) {
         list.remove(node);
         node.decrementReferenceCount();
@@ -173,11 +190,12 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
 
     /**
      * Page in a restricted number of messages
-     * 
+     *
      * @param maxItems
      * @return a list of paged in messages
      */
-    
+
+    @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
         LinkedList<MessageReference> result = new 
LinkedList<MessageReference>();
         for (Iterator<MessageReference>i = list.iterator();i.hasNext();) {
@@ -191,12 +209,14 @@ public class VMPendingMessageCursor extends 
AbstractPendingMessageCursor {
         return result;
     }
 
-    
+
+    @Override
     public boolean isTransient() {
         return true;
     }
 
-    
+
+    @Override
     public void destroy() throws Exception {
         super.destroy();
         clear();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PList.java 
b/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
index 7438963..efe29ac 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PList.java
@@ -41,6 +41,8 @@ public interface PList {
 
     long size();
 
+    long messageSize();
+
     public interface PListIterator extends Iterator<PListEntry> {
         void release();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
index 5c59158..6e79358 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
@@ -209,6 +209,7 @@ public class ProxyTopicMessageStore implements 
TopicMessageStore {
         return delegate.isPrioritizedMessages();
     }
 
+    @Override
     public void updateMessage(Message message) throws IOException {
         delegate.updateMessage(message);
     }
@@ -223,4 +224,13 @@ public class ProxyTopicMessageStore implements 
TopicMessageStore {
         return delegate.getMessageStoreStatistics();
     }
 
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.store.TopicMessageStore#getMessageSize(java.lang.String, 
java.lang.String)
+     */
+    @Override
+    public long getMessageSize(String clientId, String subscriberName)
+            throws IOException {
+        return delegate.getMessageSize(clientId, subscriberName);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
index 163b184..a55118f 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/TopicMessageStore.java
@@ -102,6 +102,8 @@ public interface TopicMessageStore extends MessageStore {
      */
     int getMessageCount(String clientId, String subscriberName) throws 
IOException;
 
+    long getMessageSize(String clientId, String subscriberName) throws 
IOException;
+
     /**
      * Finds the subscriber entry for the given consumer info
      *

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 76199d7..ae693f1 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -146,6 +146,16 @@ public class MemoryTopicMessageStore extends 
MemoryMessageStore implements Topic
     }
 
     @Override
+    public synchronized long getMessageSize(String clientId, String 
subscriberName) throws IOException {
+        long result = 0;
+        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, 
subscriberName));
+        if (sub != null) {
+            result = sub.messageSize();
+        }
+        return result;
+    }
+
+    @Override
     public synchronized void recoverNextMessages(String clientId, String 
subscriptionName, int maxReturned, MessageRecoveryListener listener) throws 
Exception {
         MemoryTopicSub sub = this.topicSubMap.get(new 
SubscriptionKey(clientId, subscriptionName));
         if (sub != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
index ec3807e..fc986f2 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicSub.java
@@ -26,8 +26,8 @@ import org.apache.activemq.store.MessageRecoveryListener;
 
 /**
  * A holder for a durable subscriber
- * 
- * 
+ *
+ *
  */
 class MemoryTopicSub {
 
@@ -58,9 +58,20 @@ class MemoryTopicSub {
         return map.size();
     }
 
+    synchronized long messageSize() {
+        long messageSize = 0;
+
+        for (Iterator<Entry<MessageId, Message>> iter = 
map.entrySet().iterator(); iter.hasNext();) {
+            Entry<MessageId, Message> entry = iter.next();
+            messageSize += entry.getValue().getSize();
+        }
+
+        return messageSize;
+    }
+
     synchronized void recoverSubscription(MessageRecoveryListener listener) 
throws Exception {
-        for (Iterator iter = map.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Entry)iter.next();
+        for (Iterator<Entry<MessageId, Message>> iter = 
map.entrySet().iterator(); iter.hasNext();) {
+            Entry<MessageId, Message> entry = iter.next();
             Object msg = entry.getValue();
             if (msg.getClass() == MessageId.class) {
                 listener.recoverMessageReference((MessageId)msg);
@@ -76,8 +87,8 @@ class MemoryTopicSub {
         // the message table is a synchronizedMap - so just have to synchronize
         // here
         int count = 0;
-        for (Iterator iter = map.entrySet().iterator(); iter.hasNext() && 
count < maxReturned;) {
-            Map.Entry entry = (Entry)iter.next();
+        for (Iterator<Entry<MessageId, Message>> iter = 
map.entrySet().iterator(); iter.hasNext() && count < maxReturned;) {
+            Entry<MessageId, Message> entry = iter.next();
             if (pastLackBatch) {
                 count++;
                 Object msg = entry.getValue();

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
index a0cb133..3bff9b2 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * 
+ *
  */
 public class JDBCTopicMessageStore extends JDBCMessageStore implements 
TopicMessageStore {
 
@@ -57,7 +57,8 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
                PROPERTY_SEQUENCE_ID_CACHE_SIZE, "1000"), 10);
     private final ReentrantReadWriteLock sequenceIdCacheSizeLock = new 
ReentrantReadWriteLock();
     private Map<MessageId, long[]> sequenceIdCache = new 
LinkedHashMap<MessageId, long[]>() {
-         protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> 
eldest) {
+         @Override
+        protected boolean removeEldestEntry(Map.Entry<MessageId, long[]> 
eldest) {
            return size() > SEQUENCE_ID_CACHE_SIZE;
         }
     };
@@ -67,6 +68,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         super(persistenceAdapter, adapter, wireFormat, topic, audit);
     }
 
+    @Override
     public void acknowledge(ConnectionContext context, String clientId, String 
subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
         if (ack != null && ack.isUnmatchedAck()) {
             if (LOG.isTraceEnabled()) {
@@ -110,16 +112,19 @@ public class JDBCTopicMessageStore extends 
JDBCMessageStore implements TopicMess
     /**
      * @throws Exception
      */
+    @Override
     public void recoverSubscription(String clientId, String subscriptionName, 
final MessageRecoveryListener listener) throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             adapter.doRecoverSubscription(c, destination, clientId, 
subscriptionName, new JDBCMessageRecoveryListener() {
+                @Override
                 public boolean recoverMessage(long sequenceId, byte[] data) 
throws Exception {
                     Message msg = (Message)wireFormat.unmarshal(new 
ByteSequence(data));
                     msg.getMessageId().setBrokerSequenceId(sequenceId);
                     return listener.recoverMessage(msg);
                 }
 
+                @Override
                 public boolean recoverMessageReference(String reference) 
throws Exception {
                     return listener.recoverMessageReference(new 
MessageId(reference));
                 }
@@ -149,16 +154,19 @@ public class JDBCTopicMessageStore extends 
JDBCMessageStore implements TopicMess
             return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
         }
 
+        @Override
         public String toString() {
             return Arrays.deepToString(perPriority);
         }
 
+        @Override
         public Iterator<LastRecoveredEntry> iterator() {
             return new PriorityIterator();
         }
 
         class PriorityIterator implements Iterator<LastRecoveredEntry> {
             int current = 9;
+            @Override
             public boolean hasNext() {
                 for (int i=current; i>=0; i--) {
                     if (perPriority[i].hasMessages()) {
@@ -169,10 +177,12 @@ public class JDBCTopicMessageStore extends 
JDBCMessageStore implements TopicMess
                 return false;
             }
 
+            @Override
             public LastRecoveredEntry next() {
                 return perPriority[current];
             }
 
+            @Override
             public void remove() {
                 throw new RuntimeException("not implemented");
             }
@@ -188,6 +198,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
             this.priority = priority;
         }
 
+        @Override
         public String toString() {
             return priority + "-" + stored + ":" + recovered;
         }
@@ -213,6 +224,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
             this.maxMessages = maxMessages;
         }
 
+        @Override
         public boolean recoverMessage(long sequenceId, byte[] data) throws 
Exception {
             if (delegate.hasSpace() && recoveredCount < maxMessages) {
                 Message msg = (Message) wireFormat.unmarshal(new 
ByteSequence(data));
@@ -226,6 +238,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
             return false;
         }
 
+        @Override
         public boolean recoverMessageReference(String reference) throws 
Exception {
             return delegate.recoverMessageReference(new MessageId(reference));
         }
@@ -244,6 +257,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         }
     }
 
+    @Override
     public synchronized void recoverNextMessages(final String clientId, final 
String subscriptionName, final int maxReturned, final MessageRecoveryListener 
listener)
             throws Exception {
         //Duration duration = new Duration("recoverNextMessages");
@@ -253,7 +267,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         if (!subscriberLastRecoveredMap.containsKey(key)) {
            subscriberLastRecoveredMap.put(key, new LastRecovered());
         }
-        final LastRecovered lastRecovered = 
subscriberLastRecoveredMap.get(key);        
+        final LastRecovered lastRecovered = 
subscriberLastRecoveredMap.get(key);
         LastRecoveredAwareListener recoveredAwareListener = new 
LastRecoveredAwareListener(listener, maxReturned);
         try {
             if (LOG.isTraceEnabled()) {
@@ -293,6 +307,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         }
     }
 
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         String key = getSubscriptionKey(clientId, subscriptionName);
         if (!pendingCompletion.contains(key))  {
@@ -330,6 +345,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         }
     }
 
+    @Override
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean 
retroactive) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -347,6 +363,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
      * @see 
org.apache.activemq.store.TopicMessageStore#lookupSubscription(String,
      *      String)
      */
+    @Override
     public SubscriptionInfo lookupSubscription(String clientId, String 
subscriptionName) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -359,6 +376,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         }
     }
 
+    @Override
     public void deleteSubscription(String clientId, String subscriptionName) 
throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -372,6 +390,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         }
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -384,6 +403,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore 
implements TopicMess
         }
     }
 
+    @Override
     public int getMessageCount(String clientId, String subscriberName) throws 
IOException {
         //Duration duration = new Duration("getMessageCount");
         int result = 0;
@@ -403,6 +423,11 @@ public class JDBCTopicMessageStore extends 
JDBCMessageStore implements TopicMess
         return result;
     }
 
+    @Override
+    public long getMessageSize(String clientId, String subscriberName) throws 
IOException {
+        return 0;
+    }
+
     protected String getSubscriptionKey(String clientId, String 
subscriberName) {
         String result = clientId + ":";
         result += subscriberName != null ? subscriberName : "NOT_SET";

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
index 51d9693..aa0cb5d 100755
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
@@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A MessageStore that uses a Journal to store it's messages.
- * 
- * 
+ *
+ *
  */
 public class JournalTopicMessageStore extends JournalMessageStore implements 
TopicMessageStore {
 
@@ -54,12 +54,14 @@ public class JournalTopicMessageStore extends 
JournalMessageStore implements Top
         this.longTermStore = checkpointStore;
     }
 
+    @Override
     public void recoverSubscription(String clientId, String subscriptionName, 
MessageRecoveryListener listener)
         throws Exception {
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.recoverSubscription(clientId, subscriptionName, 
listener);
     }
 
+    @Override
     public void recoverNextMessages(String clientId, String subscriptionName, 
int maxReturned,
                                     MessageRecoveryListener listener) throws 
Exception {
         this.peristenceAdapter.checkpoint(true, true);
@@ -67,21 +69,25 @@ public class JournalTopicMessageStore extends 
JournalMessageStore implements Top
 
     }
 
+    @Override
     public SubscriptionInfo lookupSubscription(String clientId, String 
subscriptionName) throws IOException {
         return longTermStore.lookupSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public void addSubscription(SubscriptionInfo subscriptionInfo, boolean 
retroactive) throws IOException {
         this.peristenceAdapter.checkpoint(true, true);
         longTermStore.addSubscription(subscriptionInfo, retroactive);
     }
 
+    @Override
     public void addMessage(ConnectionContext context, Message message) throws 
IOException {
         super.addMessage(context, message);
     }
 
     /**
      */
+    @Override
     public void acknowledge(ConnectionContext context, String clientId, String 
subscriptionName,
                             final MessageId messageId, MessageAck originalAck) 
throws IOException {
         final boolean debug = LOG.isDebugEnabled();
@@ -111,6 +117,7 @@ public class JournalTopicMessageStore extends 
JournalMessageStore implements Top
             }
             transactionStore.acknowledge(this, ack, location);
             context.getTransaction().addSynchronization(new Synchronization() {
+                @Override
                 public void afterCommit() throws Exception {
                     if (debug) {
                         LOG.debug("Transacted acknowledge commit for: " + 
messageId + ", at: " + location);
@@ -121,6 +128,7 @@ public class JournalTopicMessageStore extends 
JournalMessageStore implements Top
                     }
                 }
 
+                @Override
                 public void afterRollback() throws Exception {
                     if (debug) {
                         LOG.debug("Transacted acknowledge rollback for: " + 
messageId + ", at: " + location);
@@ -159,6 +167,7 @@ public class JournalTopicMessageStore extends 
JournalMessageStore implements Top
         }
     }
 
+    @Override
     public RecordLocation checkpoint() throws IOException {
 
         final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
@@ -170,6 +179,7 @@ public class JournalTopicMessageStore extends 
JournalMessageStore implements Top
         }
 
         return super.checkpoint(new Callback() {
+            @Override
             public void execute() throws Exception {
 
                 // Checkpoint the acknowledged messages.
@@ -193,19 +203,29 @@ public class JournalTopicMessageStore extends 
JournalMessageStore implements Top
         return longTermStore;
     }
 
+    @Override
     public void deleteSubscription(String clientId, String subscriptionName) 
throws IOException {
         longTermStore.deleteSubscription(clientId, subscriptionName);
     }
 
+    @Override
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return longTermStore.getAllSubscriptions();
     }
 
+    @Override
     public int getMessageCount(String clientId, String subscriberName) throws 
IOException {
         this.peristenceAdapter.checkpoint(true, true);
         return longTermStore.getMessageCount(clientId, subscriberName);
     }
 
+    @Override
+    public long getMessageSize(String clientId, String subscriberName) throws 
IOException {
+        this.peristenceAdapter.checkpoint(true, true);
+        return longTermStore.getMessageSize(clientId, subscriberName);
+    }
+
+    @Override
     public void resetBatching(String clientId, String subscriptionName) {
         longTermStore.resetBatching(clientId, subscriptionName);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 84aba07..bd45394 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -892,6 +892,30 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter {
             }
         }
 
+
+        @Override
+        public long getMessageSize(String clientId, String subscriptionName) 
throws IOException {
+            final String subscriptionKey = subscriptionKey(clientId, 
subscriptionName);
+            indexLock.writeLock().lock();
+            try {
+                return pageFile.tx().execute(new 
Transaction.CallableClosure<Integer, IOException>() {
+                    @Override
+                    public Integer execute(Transaction tx) throws IOException {
+                        StoredDestination sd = getStoredDestination(dest, tx);
+                        LastAck cursorPos = getLastAck(tx, sd, 
subscriptionKey);
+                        if (cursorPos == null) {
+                            // The subscription might not exist.
+                            return 0;
+                        }
+
+                        return (int) getStoredMessageSize(tx, sd, 
subscriptionKey);
+                    }
+                });
+            } finally {
+                indexLock.writeLock().unlock();
+            }
+        }
+
         @Override
         public void recoverSubscription(String clientId, String 
subscriptionName, final MessageRecoveryListener listener)
                 throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index ac767a7..b3fcfaa 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2536,6 +2536,32 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         return 0;
     }
 
+    public long getStoredMessageSize(Transaction tx, StoredDestination sd, 
String subscriptionKey) throws IOException {
+        SequenceSet messageSequences = sd.ackPositions.get(tx, 
subscriptionKey);
+        long locationSize = 0;
+        if (messageSequences != null) {
+            Iterator<Long> sequences = messageSequences.iterator();
+
+            while (sequences.hasNext()) {
+                Long sequenceId = sequences.next();
+                //the last item is the next marker
+                if (!sequences.hasNext()) {
+                    break;
+                }
+                Iterator<Entry<Location, Long>> iterator = 
sd.locationIndex.iterator(tx);
+                while (iterator.hasNext()) {
+                    Entry<Location, Long> entry = iterator.next();
+                    if (entry.getValue() == sequenceId - 1) {
+                        locationSize += entry.getKey().getSize();
+                        break;
+                    }
+
+                }
+            }
+        }
+
+        return locationSize;
+    }
     protected String key(KahaDestination destination) {
         return destination.getType().getNumber() + ":" + destination.getName();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 04d74b6..920fc53 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -409,6 +409,11 @@ public class TempKahaDBStore extends TempMessageDatabase 
implements PersistenceA
         }
 
         @Override
+        public long getMessageSize(String clientId, String subscriptionName) 
throws IOException {
+            return 0;
+        }
+
+        @Override
         public void recoverSubscription(String clientId, String 
subscriptionName, final MessageRecoveryListener listener) throws Exception {
             final String subscriptionKey = subscriptionKey(clientId, 
subscriptionName);
             synchronized(indexMutex) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
index 82379ea..79ac7d1 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListIndex.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.ref.WeakReference;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -59,6 +60,7 @@ public class ListIndex<Key,Value> implements Index<Key,Value> 
{
         this(pageFile, page.getPageId());
     }
 
+    @Override
     synchronized public void load(Transaction tx) throws IOException {
         if (loaded.compareAndSet(false, true)) {
             LOG.debug("loading");
@@ -81,15 +83,22 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
                 ListNode<Key, Value> node = loadNode(tx, getHeadPageId());
                 setTailPageId(getHeadPageId());
                 size.addAndGet(node.size(tx));
+                onLoad(node, tx);
                 while (node.getNext() != NOT_SET ) {
                     node = loadNode(tx, node.getNext());
                     size.addAndGet(node.size(tx));
+                    onLoad(node, tx);
                     setTailPageId(node.getPageId());
                 }
             }
         }
     }
 
+    protected void onLoad(ListNode<Key, Value> node, Transaction tx) {
+
+    }
+
+    @Override
     synchronized public void unload(Transaction tx) {
         if (loaded.compareAndSet(true, false)) {
         }
@@ -103,6 +112,7 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
         return loadNode(tx, getTailPageId());
     }
 
+    @Override
     synchronized public boolean containsKey(Transaction tx, Key key) throws 
IOException {
         assertLoaded();
 
@@ -123,6 +133,7 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
     private Map.Entry<Key, Value> lastGetEntryCache = null;
     private WeakReference<Transaction> lastCacheTxSrc = new 
WeakReference<Transaction>(null);
 
+    @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     synchronized public Value get(Transaction tx, Key key) throws IOException {
         assertLoaded();
@@ -144,6 +155,7 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
      *
      * @return the old value contained in the list if one exists or null.
      */
+    @Override
     @SuppressWarnings({ "rawtypes" })
     synchronized public Value put(Transaction tx, Key key, Value value) throws 
IOException {
 
@@ -211,6 +223,7 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
         return null;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     synchronized public Value remove(Transaction tx, Key key) throws 
IOException {
         assertLoaded();
@@ -252,15 +265,17 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
         return null;
     }
 
-    public void onRemove() {
+    public void onRemove(Entry<Key, Value> removed) {
         size.decrementAndGet();
         flushCache();
     }
 
+    @Override
     public boolean isTransient() {
         return false;
     }
 
+    @Override
     synchronized public void clear(Transaction tx) throws IOException {
         for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); 
iterator.hasNext(); ) {
             ListNode<Key,Value>candidate = iterator.next();
@@ -280,6 +295,7 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
         return getHead(tx).isEmpty(tx);
     }
 
+    @Override
     synchronized public Iterator<Map.Entry<Key,Value>> iterator(final 
Transaction tx) throws IOException {
         return getHead(tx).iterator(tx);
     }
@@ -346,6 +362,7 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
     public Marshaller<Key> getKeyMarshaller() {
         return keyMarshaller;
     }
+    @Override
     public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
         this.keyMarshaller = keyMarshaller;
     }
@@ -353,6 +370,7 @@ public class ListIndex<Key,Value> implements 
Index<Key,Value> {
     public Marshaller<Value> getValueMarshaller() {
         return valueMarshaller;
     }
+    @Override
     public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
         this.valueMarshaller = valueMarshaller;
     }

Reply via email to