Repository: qpid-broker-j Updated Branches: refs/heads/master 9e49b5d85 -> 700349d1a
QPID-7776: [Java Broker] Add FlowToDisk Queue overflow policy. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/700349d1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/700349d1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/700349d1 Branch: refs/heads/master Commit: 700349d1a929ea22e3ce1626ab397e36d8677d11 Parents: 9e49b5d Author: Keith Wall <[email protected]> Authored: Wed May 17 15:00:35 2017 +0100 Committer: Keith Wall <[email protected]> Committed: Wed May 17 16:48:40 2017 +0100 ---------------------------------------------------------------------- .../qpid/server/model/OverflowPolicy.java | 3 +- .../org/apache/qpid/server/model/Queue.java | 5 +- .../apache/qpid/server/queue/AbstractQueue.java | 7 +- .../queue/FlowToDiskOverflowPolicyHandler.java | 172 +++++++++++++++++++ .../FlowToDiskOverflowPolicyHandlerFactory.java | 42 +++++ .../server/queue/NoneOverflowPolicyHandler.java | 2 +- .../server/queue/OverflowPolicyHandler.java | 2 +- ...roducerFlowControlOverflowPolicyHandler.java | 6 +- .../server/queue/RingOverflowPolicyHandler.java | 2 +- .../FlowToDiskOverflowPolicyHandlerTest.java | 137 +++++++++++++++ ...cerFlowControlOverflowPolicyHandlerTest.java | 6 +- .../queue/RingOverflowPolicyHandlerTest.java | 8 +- .../qpid/test/utils/AmqpManagementFacade.java | 93 ++++++++-- .../qpid/test/utils/QpidBrokerTestCase.java | 5 +- .../qpid/server/queue/FlowToDiskTest.java | 90 ++++++++++ test-profiles/CPPExcludes | 2 + test-profiles/JavaTransientExcludes | 1 + 17 files changed, 550 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java index 49369de..57f334d 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java @@ -24,5 +24,6 @@ public enum OverflowPolicy { NONE, RING, - PRODUCER_FLOW_CONTROL + PRODUCER_FLOW_CONTROL, + FLOW_TO_DISK } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 7e154ed..5d33119 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -266,12 +266,15 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>, @ManagedAttribute(defaultValue = "${queue.defaultOverflowPolicy}", description = "Queue overflow policy." - + " Current options are ProducerFlowControl, Ring and None." + + " Current options are ProducerFlowControl, Ring, FlowToDisk, and None." + " ProducerFlowControl overflow policy - when queue message number or size of messages" + " in queue exceeds maximum, the producing sessions are blocked until queue depth falls" + " below the resume threshold." + " Ring overflow policy - when queue message number or size of messages in queue exceeds" + " maximum, oldest messages are discarded." + + " FlowToDisk overflow policy - when queue message number or size of messages" + + " in queue exceeds maximum, new incoming messages are written to disk and immediately" + + " evicted from memory." + " None overflow policy - queue capacity is unbounded, the attributes defining the limits for" + " maximum message number and maximum number of bytes are not applied.", mandatory = true) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 31b77e7..640bc13 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -51,7 +51,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; @@ -1165,7 +1164,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { action.performAction(entry); } - _overflowPolicyHandler.checkOverflow(); + _overflowPolicyHandler.checkOverflow(entry); } } @@ -1766,7 +1765,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override public void checkCapacity() { - _overflowPolicyHandler.checkOverflow(); + _overflowPolicyHandler.checkOverflow(null); } void notifyConsumers(QueueEntry entry) @@ -2950,7 +2949,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> if (oldOverflowPolicy != newOverflowPolicy) { _overflowPolicyHandler = createOverflowPolicyHandler(newOverflowPolicy); - _overflowPolicyHandler.checkOverflow(); + _overflowPolicyHandler.checkOverflow(null); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java new file mode 100644 index 0000000..59b2166 --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.message.MessageDeletedException; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.AbstractConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.server.model.Queue; + +public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler +{ + private final Handler _handler; + + FlowToDiskOverflowPolicyHandler(final Queue<?> queue) + { + _handler = new Handler(queue); + queue.addChangeListener(_handler); + } + + @Override + public void checkOverflow(final QueueEntry newlyEnqueued) + { + _handler.checkOverflow(newlyEnqueued); + + } + + private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler + { + private final Queue<?> _queue; + private boolean _limitsChanged; + + private Handler(final Queue<?> queue) + { + _queue = queue; + } + + @Override + public void checkOverflow(final QueueEntry newlyEnqueued) + { + long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); + long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); + if (maximumQueueDepthBytes >= 0L || maximumQueueDepthMessages >= 0L) + { + if (newlyEnqueued == null) + { + flowTailToDiskIfNecessary(maximumQueueDepthBytes, maximumQueueDepthMessages); + } + else + { + flowNewEntryToDiskIfNecessary(newlyEnqueued, maximumQueueDepthBytes, maximumQueueDepthMessages); + } + } + } + + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue); + if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName) + || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName)) + { + _limitsChanged = true; + } + } + + @Override + public void bulkChangeEnd(final ConfiguredObject<?> object) + { + super.bulkChangeEnd(object); + if (_queue.getOverflowPolicy() == OverflowPolicy.FLOW_TO_DISK) + { + if (_limitsChanged) + { + _limitsChanged = false; + flowTailToDiskIfNecessary(_queue.getMaximumQueueDepthBytes(), _queue.getMaximumQueueDepthMessages()); + } + } + else + { + _queue.removeChangeListener(this); + } + } + + private void flowTailToDiskIfNecessary(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages) + { + final long queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader(); + final long queueDepthMessages = _queue.getQueueDepthMessages(); + + if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) || + (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages)) + { + + long cumulativeDepthBytes = 0; + long cumulativeDepthMessages = 0; + + QueueEntryIterator queueEntryIterator = _queue.queueEntryIterator(); + while (queueEntryIterator.advance()) + { + QueueEntry node = queueEntryIterator.getNode(); + + if (node != null && !node.isDeleted()) + { + ServerMessage message = node.getMessage(); + if (message != null) + { + cumulativeDepthMessages++; + cumulativeDepthBytes += message.getSizeIncludingHeader(); + + if (cumulativeDepthBytes > maximumQueueDepthBytes + || cumulativeDepthMessages > maximumQueueDepthMessages) + { + flowToDisk(message); + } + } + } + } + } + } + + private void flowNewEntryToDiskIfNecessary(final QueueEntry newlyEnqueued, + final long maximumQueueDepthBytes, + final long maximumQueueDepthMessages) + { + final long queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader(); + final long queueDepthMessages = _queue.getQueueDepthMessages(); + + if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) || + (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages)) + { + ServerMessage message = newlyEnqueued.getMessage(); + if (message != null) + { + flowToDisk(message); + } + } + } + + private void flowToDisk(final ServerMessage message) + { + try (MessageReference messageReference = message.newReference()) + { + message.getStoredMessage().flowToDisk(); + } + catch (MessageDeletedException mde) + { + // pass + } + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java new file mode 100644 index 0000000..abcf98d --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.plugin.PluggableService; + +@SuppressWarnings("unused") +@PluggableService +public class FlowToDiskOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory +{ + @Override + public String getType() + { + return OverflowPolicy.FLOW_TO_DISK.name(); + } + + @Override + public OverflowPolicyHandler create(final Queue<?> queue, final EventLogger eventLogger) + { + return new FlowToDiskOverflowPolicyHandler(queue); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java index 41aa0ac..61314b4 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.queue; public class NoneOverflowPolicyHandler implements OverflowPolicyHandler { @Override - public void checkOverflow() + public void checkOverflow(final QueueEntry newlyEnqueued) { // noop } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java index 5d8e9ae..8c221b3 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java @@ -23,5 +23,5 @@ package org.apache.qpid.server.queue; public interface OverflowPolicyHandler { - void checkOverflow(); + void checkOverflow(final QueueEntry newlyEnqueued); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java index 89b678f..1e58087 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java @@ -52,9 +52,9 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH } @Override - public void checkOverflow() + public void checkOverflow(final QueueEntry newlyEnqueued) { - _handler.checkOverflow(); + _handler.checkOverflow(newlyEnqueued); } private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler @@ -79,7 +79,7 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH } @Override - public void checkOverflow() + public void checkOverflow(final QueueEntry newlyEnqueued) { long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java index 9bbc523..a810c6b 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java @@ -36,7 +36,7 @@ public class RingOverflowPolicyHandler implements OverflowPolicyHandler } @Override - public void checkOverflow() + public void checkOverflow(final QueueEntry newlyEnqueued) { final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java new file mode 100644 index 0000000..992b886 --- /dev/null +++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.BrokerTestHelper; +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.test.utils.QpidTestCase; + +public class FlowToDiskOverflowPolicyHandlerTest extends QpidTestCase +{ + private Queue<?> _queue; + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + + VirtualHost<?> virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); + + Map<String, Object> attributes = new HashMap<>(); + attributes.put(Queue.NAME, "testQueue"); + attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK); + + _queue = (AbstractQueue<?>) virtualHost.createChild(Queue.class, attributes); + } + + public void testOverflowAfterLoweringLimit() throws Exception + { + ServerMessage<?> message = createMessage(10L); + _queue.enqueue(message, null, null); + StoredMessage<?> storedMessage = message.getStoredMessage(); + verify(storedMessage, never()).flowToDisk(); + + ServerMessage<?> message2 = createMessage(10L); + _queue.enqueue(message2, null, null); + StoredMessage<?> storedMessage2 = message2.getStoredMessage(); + verify(storedMessage2, never()).flowToDisk(); + + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10)); + + verify(storedMessage2).flowToDisk(); + } + + public void testOverflowOnSecondMessage() throws Exception + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10)); + ServerMessage<?> message = createMessage(10L); + _queue.enqueue(message, null, null); + StoredMessage<?> storedMessage = message.getStoredMessage(); + verify(storedMessage, never()).flowToDisk(); + + ServerMessage<?> message2 = createMessage(10L); + _queue.enqueue(message2, null, null); + StoredMessage<?> storedMessage2 = message2.getStoredMessage(); + verify(storedMessage2).flowToDisk(); + } + + public void testBytesOverflow() throws Exception + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0)); + ServerMessage<?> message = createMessage(1L); + _queue.enqueue(message, null, null); + StoredMessage<?> storedMessage = message.getStoredMessage(); + verify(storedMessage).flowToDisk(); + } + + public void testMessagesOverflow() throws Exception + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 0)); + ServerMessage<?> message = createMessage(1L); + _queue.enqueue(message, null, null); + StoredMessage<?> storedMessage = message.getStoredMessage(); + verify(storedMessage).flowToDisk(); + } + + public void testNoOverflow() throws Exception + { + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10)); + _queue.setAttributes(Collections.singletonMap(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10)); + ServerMessage<?> message = createMessage(1L); + _queue.enqueue(message, null, null); + StoredMessage<?> storedMessage = message.getStoredMessage(); + verify(storedMessage, never()).flowToDisk(); + } + + private ServerMessage createMessage(long size) + { + ServerMessage message = mock(ServerMessage.class); + when(message.getSizeIncludingHeader()).thenReturn(size); + + StoredMessage storedMessage = mock(StoredMessage.class); + when(message.getStoredMessage()).thenReturn(storedMessage); + + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + + when(message.newReference()).thenReturn(ref); + when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref); + + return message; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java index 3c951a5..f02c1d9 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java @@ -128,7 +128,7 @@ public class ProducerFlowControlOverflowPolicyHandlerTest extends QpidTestCase when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(8L); - _producerFlowControlOverflowPolicyHandler.checkOverflow(); + _producerFlowControlOverflowPolicyHandler.checkOverflow(null); verify(session, times(1)).unblock(_queue); assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped()); @@ -153,7 +153,7 @@ public class ProducerFlowControlOverflowPolicyHandlerTest extends QpidTestCase when(_queue.getQueueDepthMessages()).thenReturn(8); - _producerFlowControlOverflowPolicyHandler.checkOverflow(); + _producerFlowControlOverflowPolicyHandler.checkOverflow(null); verify(session, times(1)).unblock(_queue); assertFalse("Flow should not be stopped", _producerFlowControlOverflowPolicyHandler.isQueueFlowStopped()); @@ -171,7 +171,7 @@ public class ProducerFlowControlOverflowPolicyHandlerTest extends QpidTestCase @Override public Void run() { - _producerFlowControlOverflowPolicyHandler.checkOverflow(); + _producerFlowControlOverflowPolicyHandler.checkOverflow(null); return null; } }); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java index 1bc30ca..964886a 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java @@ -72,7 +72,7 @@ public class RingOverflowPolicyHandlerTest extends QpidTestCase when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L); when(_queue.getQueueDepthMessages()).thenReturn(3, 1); - _ringOverflowPolicyHandler.checkOverflow(); + _ringOverflowPolicyHandler.checkOverflow(null); verify(_queue).deleteEntry(lastEntry); LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1); @@ -88,7 +88,7 @@ public class RingOverflowPolicyHandlerTest extends QpidTestCase when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L); when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L, 4L); - _ringOverflowPolicyHandler.checkOverflow(); + _ringOverflowPolicyHandler.checkOverflow(null); verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry); LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5); @@ -102,7 +102,7 @@ public class RingOverflowPolicyHandlerTest extends QpidTestCase when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L); when(_queue.getQueueDepthMessages()).thenReturn(3); - _ringOverflowPolicyHandler.checkOverflow(); + _ringOverflowPolicyHandler.checkOverflow(null); verify(_queue, never()).deleteEntry(any(QueueEntry.class)); verifyNoMoreInteractions(_eventLogger); @@ -114,7 +114,7 @@ public class RingOverflowPolicyHandlerTest extends QpidTestCase when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L); when(_queue.getQueueDepthBytesIncludingHeader()).thenReturn(10L); - _ringOverflowPolicyHandler.checkOverflow(); + _ringOverflowPolicyHandler.checkOverflow(null); verify(_queue, never()).deleteEntry(any(QueueEntry.class)); verifyNoMoreInteractions(_eventLogger); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java index 5caa9c3..901526c 100644 --- a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java +++ b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java @@ -26,7 +26,9 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; +import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; @@ -36,6 +38,9 @@ import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TemporaryQueue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public class AmqpManagementFacade { private final QpidBrokerTestCase _qpidBrokerTestCase; @@ -97,26 +102,47 @@ public class AmqpManagementFacade } } - public void performOperationUsingAmqpManagement(final String name, - final String operation, - final Session session, - final String type, - Map<String, Object> arguments) + public Object performOperationUsingAmqpManagement(final String name, + final String operation, + final Session session, + final String type, + Map<String, Object> arguments) throws JMSException { MessageProducer producer = session.createProducer(session.createQueue(_qpidBrokerTestCase.isBroker10() ? "$management" : "ADDR:$management")); - + final TemporaryQueue responseQ = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(responseQ); MapMessage opMessage = session.createMapMessage(); opMessage.setStringProperty("type", type); opMessage.setStringProperty("operation", operation); opMessage.setStringProperty("index", "object-path"); + opMessage.setJMSReplyTo(responseQ); opMessage.setStringProperty("key", name); for (Map.Entry<String, Object> argument : arguments.entrySet()) { - opMessage.setObjectProperty(argument.getKey(), argument.getValue()); + Object value = argument.getValue(); + if (value.getClass().isPrimitive() || value instanceof String) + { + opMessage.setObjectProperty(argument.getKey(), value); + } + else + { + ObjectMapper objectMapper = new ObjectMapper(); + String jsonifiedValue = null; + try + { + jsonifiedValue = objectMapper.writeValueAsString(value); + } + catch (JsonProcessingException e) + { + throw new IllegalArgumentException(String.format( + "Cannot convert the argument '%s' to JSON to meet JMS type restrictions", argument.getKey())); + } + opMessage.setObjectProperty(argument.getKey(), jsonifiedValue); + } } producer.send(opMessage); @@ -124,6 +150,51 @@ public class AmqpManagementFacade { session.commit(); } + + Message response = consumer.receive(5000); + try + { + if (response instanceof MapMessage) + { + MapMessage bodyMap = (MapMessage) response; + Map<String, Object> result = new TreeMap<>(); + Enumeration mapNames = bodyMap.getMapNames(); + while (mapNames.hasMoreElements()) + { + String key = (String) mapNames.nextElement(); + result.put(key, bodyMap.getObject(key)); + } + return result; + } + else if (response instanceof ObjectMessage) + { + return ((ObjectMessage) response).getObject(); + } + else if (response instanceof BytesMessage) + { + BytesMessage bytesMessage = (BytesMessage) response; + if (bytesMessage.getBodyLength() == 0) + { + return null; + } + else + { + byte[] buf = new byte[(int) bytesMessage.getBodyLength()]; + bytesMessage.readBytes(buf); + return buf; + } + } + throw new IllegalArgumentException("Cannot parse the results from a management operation. JMS response message : " + response); + } + finally + { + if (session.getTransacted()) + { + session.commit(); + } + consumer.close(); + responseQ.delete(); + } } public List<Map<String, Object>> managementQueryObjects(final Session session, final String type) throws JMSException @@ -141,7 +212,7 @@ public class AmqpManagementFacade producer.send(message); - Message response = consumer.receive(); + Message response = consumer.receive(5000); try { if (response instanceof MapMessage) @@ -149,7 +220,7 @@ public class AmqpManagementFacade MapMessage bodyMap = (MapMessage) response; List<String> attributeNames = (List<String>) bodyMap.getObject("attributeNames"); List<List<Object>> attributeValues = (List<List<Object>>) bodyMap.getObject("results"); - return getResultsAsMap(attributeNames, attributeValues); + return getResultsAsMaps(attributeNames, attributeValues); } else if (response instanceof ObjectMessage) { @@ -159,7 +230,7 @@ public class AmqpManagementFacade Map<String, ?> bodyMap = (Map<String, ?>) body; List<String> attributeNames = (List<String>) bodyMap.get("attributeNames"); List<List<Object>> attributeValues = (List<List<Object>>) bodyMap.get("results"); - return getResultsAsMap(attributeNames, attributeValues); + return getResultsAsMaps(attributeNames, attributeValues); } } throw new IllegalArgumentException("Cannot parse the results from a management query"); @@ -236,7 +307,7 @@ public class AmqpManagementFacade } } - private List<Map<String, Object>> getResultsAsMap(final List<String> attributeNames, final List<List<Object>> attributeValues) + private List<Map<String, Object>> getResultsAsMaps(final List<String> attributeNames, final List<List<Object>> attributeValues) { List<Map<String, Object>> results = new ArrayList<>(); for (List<Object> resultObject : attributeValues) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java ---------------------------------------------------------------------- diff --git a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 7f0a317..403b39c 100755 --- a/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -357,11 +357,10 @@ public class QpidBrokerTestCase extends QpidTestCase _managementFacade.deleteEntityUsingAmqpManagement(name, session, type); } - protected void performOperationUsingAmqpManagement(final String name, final String operation, final Session session, final String type, Map<String,Object> arguments) + protected Object performOperationUsingAmqpManagement(final String name, final String operation, final Session session, final String type, Map<String,Object> arguments) throws JMSException { - - _managementFacade.performOperationUsingAmqpManagement(name, operation, session, type, arguments); + return _managementFacade.performOperationUsingAmqpManagement(name, operation, session, type, arguments); } protected List<Map<String, Object>> managementQueryObjects(final Session session, final String type) throws JMSException http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java ---------------------------------------------------------------------- diff --git a/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java b/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java new file mode 100644 index 0000000..4527307 --- /dev/null +++ b/systests/src/test/java/org/apache/qpid/server/queue/FlowToDiskTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.QpidException; +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class FlowToDiskTest extends QpidBrokerTestCase +{ + private Session _session; + private Queue _queue; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + Connection connection = getConnection(); + connection.start(); + _session = connection.createSession(true, Session.SESSION_TRANSACTED); + + _queue = createQueue(); + } + + private Queue createQueue() throws QpidException, JMSException + { + final Map<String, Object> arguments = new HashMap<String, Object>(); + arguments.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.FLOW_TO_DISK.name()); + arguments.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 0L); + createEntityUsingAmqpManagement(getTestQueueName(), _session, "org.apache.qpid.Queue", arguments); + return getQueueFromName(_session, getTestQueueName()); + } + + public void testRoundtripWithFlowToDisk() throws Exception + { + Map<String, Object> statistics = (Map<String, Object>) performOperationUsingAmqpManagement("test", "getStatistics", _session, "org.apache.qpid.VirtualHost", Collections.singletonMap("statistics", Collections.singletonList("bytesEvacuatedFromMemory"))); + Long originalBytesEvacuatedFromMemory = (Long) statistics.get("bytesEvacuatedFromMemory"); + + TextMessage message = _session.createTextMessage("testMessage"); + MessageProducer producer = _session.createProducer(_queue); + producer.send(message); + _session.commit(); + + // make sure we are flowing to disk + Map<String, Object> statistics2 = (Map<String, Object>) performOperationUsingAmqpManagement("test", "getStatistics", _session, "org.apache.qpid.VirtualHost", Collections.singletonMap("statistics", Collections.singletonList("bytesEvacuatedFromMemory"))); + Long bytesEvacuatedFromMemory = (Long) statistics2.get("bytesEvacuatedFromMemory"); + assertTrue("Message was not evacuated from memory", bytesEvacuatedFromMemory > originalBytesEvacuatedFromMemory); + + MessageConsumer consumer = _session.createConsumer(_queue); + Message receivedMessage = consumer.receive(getReceiveTimeout()); + assertNotNull("Did not receive message", receivedMessage); + assertThat("Unexpected message type", receivedMessage, is(instanceOf(TextMessage.class))); + assertEquals("Unexpected message content", message.getText(), ((TextMessage) receivedMessage).getText()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/test-profiles/CPPExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes index aa5b06b..9039b3e 100755 --- a/test-profiles/CPPExcludes +++ b/test-profiles/CPPExcludes @@ -257,3 +257,5 @@ org.apache.qpid.systests.jms_2_0.* # Exclude the AMQP 1.0 protocol test suite org.apache.qpid.tests.protocol.v1_0.* + +org.apache.qpid.server.queue.FlowToDiskTest#* http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/700349d1/test-profiles/JavaTransientExcludes ---------------------------------------------------------------------- diff --git a/test-profiles/JavaTransientExcludes b/test-profiles/JavaTransientExcludes index 1ebfdb3..71b1341 100644 --- a/test-profiles/JavaTransientExcludes +++ b/test-profiles/JavaTransientExcludes @@ -54,3 +54,4 @@ org.apache.qpid.client.failover.FailoverBehaviourTest#testFlowControlFlagResetOn org.apache.qpid.client.failover.FailoverBehaviourTest#testFailoverHandlerTimeoutReconnected org.apache.qpid.server.failover.FailoverMethodTest#testNoFailover +org.apache.qpid.server.queue.FlowToDiskTest#* --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
