Repository: qpid-broker-j Updated Branches: refs/heads/master 17e6c7d6e -> b7ee49ded
QPID-7815: [Java Broker] Invoke overflow policy check on queue maximum queue depth changes Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b7ee49de Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b7ee49de Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b7ee49de Branch: refs/heads/master Commit: b7ee49ded4df66ecbb7b4836c599471659358e7e Parents: 17e6c7d Author: Alex Rudyy <[email protected]> Authored: Fri Aug 11 15:57:05 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Fri Aug 11 15:57:30 2017 +0100 ---------------------------------------------------------------------- .../queue/FlowToDiskOverflowPolicyHandler.java | 44 ++------ ...owPolicyMaximumQueueDepthChangeListener.java | 76 +++++++++++++ .../server/queue/RingOverflowPolicyHandler.java | 106 +++++++++++-------- 3 files changed, 148 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java index 9a6b354..6115408 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java @@ -21,8 +21,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.message.MessageDeletedException; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.AbstractConfigurationChangeListener; -import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.OverflowPolicy; import org.apache.qpid.server.model.Queue; @@ -43,16 +41,22 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler } - private static class Handler extends AbstractConfigurationChangeListener + private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener { private final Queue<?> _queue; - private boolean _limitsChanged; private Handler(final Queue<?> queue) { + super(OverflowPolicy.FLOW_TO_DISK); _queue = queue; } + @Override + void onMaximumQueueDepthChange(final Queue<?> queue) + { + checkOverflow(null); + } + private void checkOverflow(final QueueEntry newlyEnqueued) { long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); @@ -70,38 +74,6 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler } } - @Override - public void attributeSet(final ConfiguredObject<?> object, - final String attributeName, - final Object oldAttributeValue, - final Object newAttributeValue) - { - super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue); - if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName) - || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName)) - { - _limitsChanged = true; - } - } - - @Override - public void bulkChangeEnd(final ConfiguredObject<?> object) - { - super.bulkChangeEnd(object); - if (_queue.getOverflowPolicy() == OverflowPolicy.FLOW_TO_DISK) - { - if (_limitsChanged) - { - _limitsChanged = false; - flowTailToDiskIfNecessary(_queue.getMaximumQueueDepthBytes(), _queue.getMaximumQueueDepthMessages()); - } - } - else - { - _queue.removeChangeListener(this); - } - } - private void flowTailToDiskIfNecessary(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages) { final long queueDepthBytes = _queue.getQueueDepthBytes(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java new file mode 100644 index 0000000..1651cef --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.server.model.AbstractConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.server.model.Queue; + +abstract class OverflowPolicyMaximumQueueDepthChangeListener extends AbstractConfigurationChangeListener +{ + private final OverflowPolicy _overflowPolicy; + private boolean _maximumQueueDepthChangeDetected; + + OverflowPolicyMaximumQueueDepthChangeListener(final OverflowPolicy overflowPolicy) + { + _overflowPolicy = overflowPolicy; + } + + @Override + public void attributeSet(final ConfiguredObject<?> object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue); + if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName) + || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName)) + { + _maximumQueueDepthChangeDetected = true; + } + } + + @Override + public void bulkChangeEnd(final ConfiguredObject<?> object) + { + super.bulkChangeEnd(object); + if (object instanceof Queue) + { + Queue<?> queue = (Queue<?>) object; + + if (queue.getOverflowPolicy() == _overflowPolicy) + { + if (_maximumQueueDepthChangeDetected) + { + _maximumQueueDepthChangeDetected = false; + onMaximumQueueDepthChange(queue); + } + } + else + { + queue.removeChangeListener(this); + } + } + } + + abstract void onMaximumQueueDepthChange(final Queue<?> queue); +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java index 99e1c2e..d1da30b 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java @@ -21,71 +21,93 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.model.OverflowPolicy; import org.apache.qpid.server.model.Queue; public class RingOverflowPolicyHandler implements OverflowPolicyHandler { - private final Queue<?> _queue; - private final EventLogger _eventLogger; + private final Handler _handler; RingOverflowPolicyHandler(final Queue<?> queue, final EventLogger eventLogger) { - _queue = queue; - _eventLogger = eventLogger; + _handler = new Handler(queue, eventLogger); + queue.addChangeListener(_handler); } @Override public void checkOverflow(final QueueEntry newlyEnqueued) { - final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); - final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); - - boolean bytesOverflow, messagesOverflow, overflow = false; - int counter = 0; - int queueDepthMessages; - long queueDepthBytes; - do + _handler.checkOverflow(); + } + + private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener + { + private final Queue<?> _queue; + private final EventLogger _eventLogger; + + public Handler(final Queue<?> queue, final EventLogger eventLogger) + { + super(OverflowPolicy.RING); + _queue = queue; + _eventLogger = eventLogger; + } + + @Override + void onMaximumQueueDepthChange(final Queue<?> queue) { - queueDepthMessages = _queue.getQueueDepthMessages(); - queueDepthBytes = _queue.getQueueDepthBytes(); + checkOverflow(); + } - messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages; - bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes; + private void checkOverflow() + { + final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); + final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); - if (bytesOverflow || messagesOverflow) + boolean bytesOverflow, messagesOverflow, overflow = false; + int counter = 0; + int queueDepthMessages; + long queueDepthBytes; + do { - if (!overflow) - { - overflow = true; - } + queueDepthMessages = _queue.getQueueDepthMessages(); + queueDepthBytes = _queue.getQueueDepthBytes(); - QueueEntry entry = _queue.getLeastSignificantOldestEntry(); + messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages; + bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes; - if (entry != null) - { - counter++; - _queue.deleteEntry(entry); - } - else + if (bytesOverflow || messagesOverflow) { - queueDepthMessages = _queue.getQueueDepthMessages(); - queueDepthBytes = _queue.getQueueDepthBytes(); - break; + if (!overflow) + { + overflow = true; + } + + QueueEntry entry = _queue.getLeastSignificantOldestEntry(); + + if (entry != null) + { + counter++; + _queue.deleteEntry(entry); + } + else + { + queueDepthMessages = _queue.getQueueDepthMessages(); + queueDepthBytes = _queue.getQueueDepthBytes(); + break; + } } } - } - while (bytesOverflow || messagesOverflow); + while (bytesOverflow || messagesOverflow); - if (overflow) - { - _eventLogger.message(_queue.getLogSubject(), - QueueMessages.DROPPED( - counter, - queueDepthBytes, - queueDepthMessages, - maximumQueueDepthBytes, - maximumQueueDepthMessages)); + if (overflow) + { + _eventLogger.message(_queue.getLogSubject(), QueueMessages.DROPPED(counter, + queueDepthBytes, + queueDepthMessages, + maximumQueueDepthBytes, + maximumQueueDepthMessages)); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
