Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 9e49b5d85 -> 700349d1a


QPID-7776: [Java Broker] Add FlowToDisk Queue overflow policy.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/700349d1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/700349d1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/700349d1

Branch: refs/heads/master
Commit: 700349d1a929ea22e3ce1626ab397e36d8677d11
Parents: 9e49b5d
Author: Keith Wall <[email protected]>
Authored: Wed May 17 15:00:35 2017 +0100
Committer: Keith Wall <[email protected]>
Committed: Wed May 17 16:48:40 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/model/OverflowPolicy.java       |   3 +-
 .../org/apache/qpid/server/model/Queue.java     |   5 +-
 .../apache/qpid/server/queue/AbstractQueue.java |   7 +-
 .../queue/FlowToDiskOverflowPolicyHandler.java  | 172 +++++++++++++++++++
 .../FlowToDiskOverflowPolicyHandlerFactory.java |  42 +++++
 .../server/queue/NoneOverflowPolicyHandler.java |   2 +-
 .../server/queue/OverflowPolicyHandler.java     |   2 +-
 ...roducerFlowControlOverflowPolicyHandler.java |   6 +-
 .../server/queue/RingOverflowPolicyHandler.java |   2 +-
 .../FlowToDiskOverflowPolicyHandlerTest.java    | 137 +++++++++++++++
 ...cerFlowControlOverflowPolicyHandlerTest.java |   6 +-
 .../queue/RingOverflowPolicyHandlerTest.java    |   8 +-
 .../qpid/test/utils/AmqpManagementFacade.java   |  93 ++++++++--
 .../qpid/test/utils/QpidBrokerTestCase.java     |   5 +-
 .../qpid/server/queue/FlowToDiskTest.java       |  90 ++++++++++
 test-profiles/CPPExcludes                       |   2 +
 test-profiles/JavaTransientExcludes             |   1 +
 17 files changed, 550 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
index 49369de..57f334d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
@@ -24,5 +24,6 @@ public enum OverflowPolicy
 {
     NONE,
     RING,
-    PRODUCER_FLOW_CONTROL
+    PRODUCER_FLOW_CONTROL,
+    FLOW_TO_DISK
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 7e154ed..5d33119 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -266,12 +266,15 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
 
     @ManagedAttribute(defaultValue = "${queue.defaultOverflowPolicy}",
             description = "Queue overflow policy."
-                          + " Current options are ProducerFlowControl, Ring 
and None."
+                          + " Current options are ProducerFlowControl, Ring, 
FlowToDisk, and None."
                           + " ProducerFlowControl overflow policy - when queue 
message number or size of messages"
                           + " in queue exceeds maximum, the producing sessions 
are blocked until queue depth falls"
                           + " below the resume threshold."
                           + " Ring overflow policy - when queue message number 
or size of messages in queue exceeds"
                           + " maximum, oldest messages are discarded."
+                          + " FlowToDisk overflow policy - when queue message 
number or size of messages"
+                          + " in queue exceeds maximum, new incoming messages 
are written to disk and immediately"
+                          + " evicted from memory."
                           + " None overflow policy - queue capacity is 
unbounded, the attributes defining the limits for"
                           + " maximum message number and maximum number of 
bytes are not applied.",
             mandatory = true)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 31b77e7..640bc13 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -51,7 +51,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -1165,7 +1164,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
             {
                 action.performAction(entry);
             }
-            _overflowPolicyHandler.checkOverflow();
+            _overflowPolicyHandler.checkOverflow(entry);
         }
 
     }
@@ -1766,7 +1765,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     @Override
     public void checkCapacity()
     {
-        _overflowPolicyHandler.checkOverflow();
+        _overflowPolicyHandler.checkOverflow(null);
     }
 
     void notifyConsumers(QueueEntry entry)
@@ -2950,7 +2949,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         if (oldOverflowPolicy != newOverflowPolicy)
         {
             _overflowPolicyHandler = 
createOverflowPolicyHandler(newOverflowPolicy);
-            _overflowPolicyHandler.checkOverflow();
+            _overflowPolicyHandler.checkOverflow(null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
 
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
new file mode 100644
index 0000000..59b2166
--- /dev/null
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -0,0 +1,172 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.MessageDeletedException;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+
+public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
+{
+    private final Handler _handler;
+
+    FlowToDiskOverflowPolicyHandler(final Queue<?> queue)
+    {
+        _handler = new Handler(queue);
+        queue.addChangeListener(_handler);
+    }
+
+    @Override
+    public void checkOverflow(final QueueEntry newlyEnqueued)
+    {
+        _handler.checkOverflow(newlyEnqueued);
+
+    }
+
+    private static class Handler extends AbstractConfigurationChangeListener 
implements OverflowPolicyHandler
+    {
+        private final Queue<?> _queue;
+        private boolean _limitsChanged;
+
+        private Handler(final Queue<?> queue)
+        {
+            _queue = queue;
+        }
+
+        @Override
+        public void checkOverflow(final QueueEntry newlyEnqueued)
+        {
+            long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+            long maximumQueueDepthMessages = 
_queue.getMaximumQueueDepthMessages();
+            if (maximumQueueDepthBytes >= 0L || maximumQueueDepthMessages >= 
0L)
+            {
+                if (newlyEnqueued == null)
+                {
+                    flowTailToDiskIfNecessary(maximumQueueDepthBytes, 
maximumQueueDepthMessages);
+                }
+                else
+                {
+                    flowNewEntryToDiskIfNecessary(newlyEnqueued, 
maximumQueueDepthBytes, maximumQueueDepthMessages);
+                }
+            }
+        }
+
+        @Override
+        public void attributeSet(final ConfiguredObject<?> object,
+                                 final String attributeName,
+                                 final Object oldAttributeValue,
+                                 final Object newAttributeValue)
+        {
+            super.attributeSet(object, attributeName, oldAttributeValue, 
newAttributeValue);
+            if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName)
+                || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName))
+            {
+                _limitsChanged = true;
+            }
+        }
+
+        @Override
+        public void bulkChangeEnd(final ConfiguredObject<?> object)
+        {
+            super.bulkChangeEnd(object);
+            if (_queue.getOverflowPolicy() == OverflowPolicy.FLOW_TO_DISK)
+            {
+                if (_limitsChanged)
+                {
+                    _limitsChanged = false;
+                    
flowTailToDiskIfNecessary(_queue.getMaximumQueueDepthBytes(), 
_queue.getMaximumQueueDepthMessages());
+                }
+            }
+            else
+            {
+                _queue.removeChangeListener(this);
+            }
+        }
+
+        private void flowTailToDiskIfNecessary(final long 
maximumQueueDepthBytes, final long maximumQueueDepthMessages)
+        {
+            final long queueDepthBytes = 
_queue.getQueueDepthBytesIncludingHeader();
+            final long queueDepthMessages = _queue.getQueueDepthMessages();
+
+            if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > 
maximumQueueDepthBytes) ||
+                (maximumQueueDepthMessages >= 0L && queueDepthMessages > 
maximumQueueDepthMessages))
+            {
+
+                long cumulativeDepthBytes = 0;
+                long cumulativeDepthMessages = 0;
+
+                QueueEntryIterator queueEntryIterator = 
_queue.queueEntryIterator();
+                while (queueEntryIterator.advance())
+                {
+                    QueueEntry node = queueEntryIterator.getNode();
+
+                    if (node != null && !node.isDeleted())
+                    {
+                        ServerMessage message = node.getMessage();
+                        if (message != null)
+                        {
+                            cumulativeDepthMessages++;
+                            cumulativeDepthBytes += 
message.getSizeIncludingHeader();
+
+                            if (cumulativeDepthBytes > maximumQueueDepthBytes
+                                || cumulativeDepthMessages > 
maximumQueueDepthMessages)
+                            {
+                                flowToDisk(message);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        private void flowNewEntryToDiskIfNecessary(final QueueEntry 
newlyEnqueued,
+                                                   final long 
maximumQueueDepthBytes,
+                                                   final long 
maximumQueueDepthMessages)
+        {
+            final long queueDepthBytes = 
_queue.getQueueDepthBytesIncludingHeader();
+            final long queueDepthMessages = _queue.getQueueDepthMessages();
+
+            if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > 
maximumQueueDepthBytes) ||
+                (maximumQueueDepthMessages >= 0L && queueDepthMessages > 
maximumQueueDepthMessages))
+            {
+                ServerMessage message = newlyEnqueued.getMessage();
+                if (message != null)
+                {
+                    flowToDisk(message);
+                }
+            }
+        }
+
+        private void flowToDisk(final ServerMessage message)
+        {
+            try (MessageReference messageReference = message.newReference())
+            {
+                message.getStoredMessage().flowToDisk();
+            }
+            catch (MessageDeletedException mde)
+            {
+                // pass
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
 
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
new file mode 100644
index 0000000..abcf98d
--- /dev/null
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@SuppressWarnings("unused")
+@PluggableService
+public class FlowToDiskOverflowPolicyHandlerFactory implements 
OverflowPolicyHandlerFactory
+{
+    @Override
+    public String getType()
+    {
+        return OverflowPolicy.FLOW_TO_DISK.name();
+    }
+
+    @Override
+    public OverflowPolicyHandler create(final Queue<?> queue, final 
EventLogger eventLogger)
+    {
+        return new FlowToDiskOverflowPolicyHandler(queue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
 
b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
index 41aa0ac..61314b4 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
 public class NoneOverflowPolicyHandler implements OverflowPolicyHandler
 {
     @Override
-    public void checkOverflow()
+    public void checkOverflow(final QueueEntry newlyEnqueued)
     {
         // noop
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
 
b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
index 5d8e9ae..8c221b3 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
@@ -23,5 +23,5 @@ package org.apache.qpid.server.queue;
 
 public interface OverflowPolicyHandler
 {
-    void checkOverflow();
+    void checkOverflow(final QueueEntry newlyEnqueued);
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
 
b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
index 89b678f..1e58087 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
@@ -52,9 +52,9 @@ public class ProducerFlowControlOverflowPolicyHandler 
implements OverflowPolicyH
     }
 
     @Override
-    public void checkOverflow()
+    public void checkOverflow(final QueueEntry newlyEnqueued)
     {
-        _handler.checkOverflow();
+        _handler.checkOverflow(newlyEnqueued);
     }
 
     private static class Handler extends AbstractConfigurationChangeListener 
implements OverflowPolicyHandler
@@ -79,7 +79,7 @@ public class ProducerFlowControlOverflowPolicyHandler 
implements OverflowPolicyH
         }
 
         @Override
-        public void checkOverflow()
+        public void checkOverflow(final QueueEntry newlyEnqueued)
         {
             long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
             long maximumQueueDepthMessages = 
_queue.getMaximumQueueDepthMessages();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
 
b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
index 9bbc523..a810c6b 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
@@ -36,7 +36,7 @@ public class RingOverflowPolicyHandler implements 
OverflowPolicyHandler
     }
 
     @Override
-    public void checkOverflow()
+    public void checkOverflow(final QueueEntry newlyEnqueued)
     {
         final long maximumQueueDepthMessages = 
_queue.getMaximumQueueDepthMessages();
         final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
new file mode 100644
index 0000000..992b886
--- /dev/null
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class FlowToDiskOverflowPolicyHandlerTest extends QpidTestCase
+{
+    private Queue<?> _queue;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        BrokerTestHelper.setUp();
+
+        VirtualHost<?> virtualHost = 
BrokerTestHelper.createVirtualHost(getClass().getName());
+
+        Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Queue.NAME, "testQueue");
+        attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK);
+
+        _queue = (AbstractQueue<?>) virtualHost.createChild(Queue.class, 
attributes);
+    }
+
+    public void testOverflowAfterLoweringLimit() throws Exception
+    {
+        ServerMessage<?> message = createMessage(10L);
+        _queue.enqueue(message, null, null);
+        StoredMessage<?> storedMessage = message.getStoredMessage();
+        verify(storedMessage, never()).flowToDisk();
+
+        ServerMessage<?> message2 = createMessage(10L);
+        _queue.enqueue(message2, null, null);
+        StoredMessage<?> storedMessage2 = message2.getStoredMessage();
+        verify(storedMessage2, never()).flowToDisk();
+
+        
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
10));
+
+        verify(storedMessage2).flowToDisk();
+    }
+
+    public void testOverflowOnSecondMessage() throws Exception
+    {
+        
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
10));
+        ServerMessage<?> message = createMessage(10L);
+        _queue.enqueue(message, null, null);
+        StoredMessage<?> storedMessage = message.getStoredMessage();
+        verify(storedMessage, never()).flowToDisk();
+
+        ServerMessage<?> message2 = createMessage(10L);
+        _queue.enqueue(message2, null, null);
+        StoredMessage<?> storedMessage2 = message2.getStoredMessage();
+        verify(storedMessage2).flowToDisk();
+    }
+
+    public void testBytesOverflow() throws Exception
+    {
+        
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
0));
+        ServerMessage<?> message = createMessage(1L);
+        _queue.enqueue(message, null, null);
+        StoredMessage<?> storedMessage = message.getStoredMessage();
+        verify(storedMessage).flowToDisk();
+    }
+
+    public void testMessagesOverflow() throws Exception
+    {
+        
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES,
 0));
+        ServerMessage<?> message = createMessage(1L);
+        _queue.enqueue(message, null, null);
+        StoredMessage<?> storedMessage = message.getStoredMessage();
+        verify(storedMessage).flowToDisk();
+    }
+
+    public void testNoOverflow() throws Exception
+    {
+        
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES,
 10));
+        
_queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 
10));
+        ServerMessage<?> message = createMessage(1L);
+        _queue.enqueue(message, null, null);
+        StoredMessage<?> storedMessage = message.getStoredMessage();
+        verify(storedMessage, never()).flowToDisk();
+    }
+
+    private ServerMessage createMessage(long size)
+    {
+        ServerMessage message = mock(ServerMessage.class);
+        when(message.getSizeIncludingHeader()).thenReturn(size);
+
+        StoredMessage storedMessage = mock(StoredMessage.class);
+        when(message.getStoredMessage()).thenReturn(storedMessage);
+
+        MessageReference ref = mock(MessageReference.class);
+        when(ref.getMessage()).thenReturn(message);
+
+        when(message.newReference()).thenReturn(ref);
+        
when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
index 3c951a5..f02c1d9 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
@@ -128,7 +128,7 @@ public class ProducerFlowControlOverflowPolicyHandlerTest 
extends QpidTestCase
 
         when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(8L);
 
-        _producerFlowControlOverflowPolicyHandler.checkOverflow();
+        _producerFlowControlOverflowPolicyHandler.checkOverflow(null);
 
         verify(session, times(1)).unblock(_queue);
         assertFalse("Flow should not be stopped", 
_producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
@@ -153,7 +153,7 @@ public class ProducerFlowControlOverflowPolicyHandlerTest 
extends QpidTestCase
 
         when(_queue.getQueueDepthMessages()).thenReturn(8);
 
-        _producerFlowControlOverflowPolicyHandler.checkOverflow();
+        _producerFlowControlOverflowPolicyHandler.checkOverflow(null);
 
         verify(session, times(1)).unblock(_queue);
         assertFalse("Flow should not be stopped", 
_producerFlowControlOverflowPolicyHandler.isQueueFlowStopped());
@@ -171,7 +171,7 @@ public class ProducerFlowControlOverflowPolicyHandlerTest 
extends QpidTestCase
             @Override
             public Void run()
             {
-                _producerFlowControlOverflowPolicyHandler.checkOverflow();
+                _producerFlowControlOverflowPolicyHandler.checkOverflow(null);
                 return null;
             }
         });

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
index 1bc30ca..964886a 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
@@ -72,7 +72,7 @@ public class RingOverflowPolicyHandlerTest extends 
QpidTestCase
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
         when(_queue.getQueueDepthMessages()).thenReturn(3, 1);
 
-        _ringOverflowPolicyHandler.checkOverflow();
+        _ringOverflowPolicyHandler.checkOverflow(null);
 
         verify(_queue).deleteEntry(lastEntry);
         LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1);
@@ -88,7 +88,7 @@ public class RingOverflowPolicyHandlerTest extends 
QpidTestCase
         when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
         when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L);
 
-        _ringOverflowPolicyHandler.checkOverflow();
+        _ringOverflowPolicyHandler.checkOverflow(null);
 
         verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry);
         LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5);
@@ -102,7 +102,7 @@ public class RingOverflowPolicyHandlerTest extends 
QpidTestCase
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
         when(_queue.getQueueDepthMessages()).thenReturn(3);
 
-        _ringOverflowPolicyHandler.checkOverflow();
+        _ringOverflowPolicyHandler.checkOverflow(null);
 
         verify(_queue, never()).deleteEntry(any(QueueEntry.class));
         verifyNoMoreInteractions(_eventLogger);
@@ -114,7 +114,7 @@ public class RingOverflowPolicyHandlerTest extends 
QpidTestCase
         when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
         when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L);
 
-        _ringOverflowPolicyHandler.checkOverflow();
+        _ringOverflowPolicyHandler.checkOverflow(null);
 
         verify(_queue, never()).deleteEntry(any(QueueEntry.class));
         verifyNoMoreInteractions(_eventLogger);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git 
a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java 
b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
index 5caa9c3..901526c 100644
--- 
a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
+++ 
b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
@@ -26,7 +26,9 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
+import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
@@ -36,6 +38,9 @@ import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 public class AmqpManagementFacade
 {
     private final QpidBrokerTestCase _qpidBrokerTestCase;
@@ -97,26 +102,47 @@ public class AmqpManagementFacade
         }
     }
 
-    public void performOperationUsingAmqpManagement(final String name,
-                                                    final String operation,
-                                                    final Session session,
-                                                    final String type,
-                                                    Map<String, Object> 
arguments)
+    public Object performOperationUsingAmqpManagement(final String name,
+                                                                         final 
String operation,
+                                                                         final 
Session session,
+                                                                         final 
String type,
+                                                                         
Map<String, Object> arguments)
             throws JMSException
     {
         MessageProducer producer = 
session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10()
                                                                                
       ? "$management"
                                                                                
       : "ADDR:$management"));
-
+        final TemporaryQueue responseQ = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(responseQ);
         MapMessage opMessage = session.createMapMessage();
         opMessage.setStringProperty("type", type);
         opMessage.setStringProperty("operation", operation);
         opMessage.setStringProperty("index", "object-path");
+        opMessage.setJMSReplyTo(responseQ);
 
         opMessage.setStringProperty("key", name);
         for (Map.Entry<String, Object> argument : arguments.entrySet())
         {
-            opMessage.setObjectProperty(argument.getKey(), 
argument.getValue());
+            Object value = argument.getValue();
+            if (value.getClass().isPrimitive() || value instanceof String)
+            {
+                opMessage.setObjectProperty(argument.getKey(), value);
+            }
+            else
+            {
+                ObjectMapper objectMapper = new ObjectMapper();
+                String jsonifiedValue = null;
+                try
+                {
+                    jsonifiedValue = objectMapper.writeValueAsString(value);
+                }
+                catch (JsonProcessingException e)
+                {
+                    throw new IllegalArgumentException(String.format(
+                            "Cannot convert the argument '%s' to JSON to meet 
JMS type restrictions", argument.getKey()));
+                }
+                opMessage.setObjectProperty(argument.getKey(), jsonifiedValue);
+            }
         }
 
         producer.send(opMessage);
@@ -124,6 +150,51 @@ public class AmqpManagementFacade
         {
             session.commit();
         }
+
+        Message response = consumer.receive(5000);
+        try
+        {
+            if (response instanceof MapMessage)
+            {
+                MapMessage bodyMap = (MapMessage) response;
+                Map<String, Object> result = new TreeMap<>();
+                Enumeration mapNames = bodyMap.getMapNames();
+                while (mapNames.hasMoreElements())
+                {
+                    String key = (String) mapNames.nextElement();
+                    result.put(key, bodyMap.getObject(key));
+                }
+                return result;
+            }
+            else if (response instanceof ObjectMessage)
+            {
+                return ((ObjectMessage) response).getObject();
+            }
+            else if (response instanceof BytesMessage)
+            {
+                BytesMessage bytesMessage = (BytesMessage) response;
+                if (bytesMessage.getBodyLength() == 0)
+                {
+                    return null;
+                }
+                else
+                {
+                    byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
+                    bytesMessage.readBytes(buf);
+                    return buf;
+                }
+            }
+            throw new IllegalArgumentException("Cannot parse the results from 
a management operation.  JMS response message : " + response);
+        }
+        finally
+        {
+            if (session.getTransacted())
+            {
+                session.commit();
+            }
+            consumer.close();
+            responseQ.delete();
+        }
     }
 
     public List<Map<String, Object>> managementQueryObjects(final Session 
session, final String type) throws JMSException
@@ -141,7 +212,7 @@ public class AmqpManagementFacade
 
         producer.send(message);
 
-        Message response = consumer.receive();
+        Message response = consumer.receive(5000);
         try
         {
             if (response instanceof MapMessage)
@@ -149,7 +220,7 @@ public class AmqpManagementFacade
                 MapMessage bodyMap = (MapMessage) response;
                 List<String> attributeNames = (List<String>) 
bodyMap.getObject("attributeNames");
                 List<List<Object>> attributeValues = (List<List<Object>>) 
bodyMap.getObject("results");
-                return getResultsAsMap(attributeNames, attributeValues);
+                return getResultsAsMaps(attributeNames, attributeValues);
             }
             else if (response instanceof ObjectMessage)
             {
@@ -159,7 +230,7 @@ public class AmqpManagementFacade
                     Map<String, ?> bodyMap = (Map<String, ?>) body;
                     List<String> attributeNames = (List<String>) 
bodyMap.get("attributeNames");
                     List<List<Object>> attributeValues = (List<List<Object>>) 
bodyMap.get("results");
-                    return getResultsAsMap(attributeNames, attributeValues);
+                    return getResultsAsMaps(attributeNames, attributeValues);
                 }
             }
             throw new IllegalArgumentException("Cannot parse the results from 
a management query");
@@ -236,7 +307,7 @@ public class AmqpManagementFacade
         }
     }
 
-    private List<Map<String, Object>> getResultsAsMap(final List<String> 
attributeNames, final List<List<Object>> attributeValues)
+    private List<Map<String, Object>> getResultsAsMaps(final List<String> 
attributeNames, final List<List<Object>> attributeValues)
     {
         List<Map<String, Object>> results = new ArrayList<>();
         for (List<Object> resultObject : attributeValues)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
----------------------------------------------------------------------
diff --git 
a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java 
b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 7f0a317..403b39c 100755
--- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -357,11 +357,10 @@ public class QpidBrokerTestCase extends QpidTestCase
         _managementFacade.deleteEntityUsingAmqpManagement(name, session, type);
     }
 
-    protected void performOperationUsingAmqpManagement(final String name, 
final String operation, final Session session, final String type, 
Map<String,Object> arguments)
+    protected Object performOperationUsingAmqpManagement(final String name, 
final String operation, final Session session, final String type, 
Map<String,Object> arguments)
             throws JMSException
     {
-
-        _managementFacade.performOperationUsingAmqpManagement(name, operation, 
session, type, arguments);
+        return _managementFacade.performOperationUsingAmqpManagement(name, 
operation, session, type, arguments);
     }
 
     protected List<Map<String, Object>> managementQueryObjects(final Session 
session, final String type) throws JMSException

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java
----------------------------------------------------------------------
diff --git 
a/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java 
b/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java
new file mode 100644
index 0000000..4527307
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.QpidException;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class FlowToDiskTest extends QpidBrokerTestCase
+{
+    private Session _session;
+    private Queue _queue;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        Connection connection = getConnection();
+        connection.start();
+        _session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        _queue = createQueue();
+    }
+
+    private Queue createQueue() throws QpidException, JMSException
+    {
+        final Map<String, Object> arguments = new HashMap<String, Object>();
+        arguments.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, 
OverflowPolicy.FLOW_TO_DISK.name());
+        
arguments.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0L);
+        createEntityUsingAmqpManagement(getTestQueueName(), _session, 
"org.apache.qpid.Queue", arguments);
+        return getQueueFromName(_session, getTestQueueName());
+    }
+
+    public void testRoundtripWithFlowToDisk() throws Exception
+    {
+        Map<String, Object> statistics = (Map<String, Object>) 
performOperationUsingAmqpManagement("test", "getStatistics", _session, 
"org.apache.qpid.VirtualHost", Collections.singletonMap("statistics", 
Collections.singletonList("bytesEvacuatedFromMemory")));
+        Long originalBytesEvacuatedFromMemory = (Long) 
statistics.get("bytesEvacuatedFromMemory");
+
+        TextMessage message = _session.createTextMessage("testMessage");
+        MessageProducer producer = _session.createProducer(_queue);
+        producer.send(message);
+        _session.commit();
+
+        // make sure we are flowing to disk
+        Map<String, Object> statistics2 = (Map<String, Object>) 
performOperationUsingAmqpManagement("test", "getStatistics", _session, 
"org.apache.qpid.VirtualHost", Collections.singletonMap("statistics", 
Collections.singletonList("bytesEvacuatedFromMemory")));
+        Long bytesEvacuatedFromMemory = (Long) 
statistics2.get("bytesEvacuatedFromMemory");
+        assertTrue("Message was not evacuated from memory", 
bytesEvacuatedFromMemory > originalBytesEvacuatedFromMemory);
+
+        MessageConsumer consumer = _session.createConsumer(_queue);
+        Message receivedMessage = consumer.receive(getReceiveTimeout());
+        assertNotNull("Did not receive message", receivedMessage);
+        assertThat("Unexpected message type", receivedMessage, 
is(instanceOf(TextMessage.class)));
+        assertEquals("Unexpected message content", message.getText(), 
((TextMessage) receivedMessage).getText());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index aa5b06b..9039b3e 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -257,3 +257,5 @@ org.apache.qpid.systests.jms_2_0.*
 
 # Exclude the AMQP 1.0 protocol test suite
 org.apache.qpid.tests.protocol.v1_0.*
+
+org.apache.qpid.server.queue.FlowToDiskTest#*

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/test-profiles/JavaTransientExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaTransientExcludes 
b/test-profiles/JavaTransientExcludes
index 1ebfdb3..71b1341 100644
--- a/test-profiles/JavaTransientExcludes
+++ b/test-profiles/JavaTransientExcludes
@@ -54,3 +54,4 @@ 
org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOn
 
org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutReconnected
 org.apache.qpid.server.failover.FailoverMethodTest#testNoFailover
 
+org.apache.qpid.server.queue.FlowToDiskTest#*


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to