Repository: qpid-broker-j Updated Branches: refs/heads/master a946173df -> 42bebb9ff
QPID-7815: [Java Broker] Move overflow policy handlers creation from afterSet into changeAttributes Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/42bebb9f Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/42bebb9f Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/42bebb9f Branch: refs/heads/master Commit: 42bebb9ff6e92f81eaffbd8d8a5350f3e6c10b1d Parents: a946173 Author: Alex Rudyy <oru...@apache.org> Authored: Fri Aug 11 14:21:58 2017 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Fri Aug 11 14:21:58 2017 +0100 ---------------------------------------------------------------------- .../apache/qpid/server/queue/AbstractQueue.java | 38 +++++++++++++------- .../queue/FlowToDiskOverflowPolicyHandler.java | 5 ++- ...roducerFlowControlOverflowPolicyHandler.java | 5 ++- 3 files changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bebb9f/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index c6accc2..0e72802 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -241,7 +241,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @ManagedAttributeField private volatile boolean _holdOnPublishEnabled; - @ManagedAttributeField(afterSet = "postSetOverflowPolicy") + @ManagedAttributeField() private OverflowPolicy _overflowPolicy; @ManagedAttributeField private long _maximumQueueDepthMessages; @@ -573,17 +573,19 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } - if (_rejectPolicyHandler != null) + OverflowPolicy overflowPolicy = getOverflowPolicy(); + _postEnqueueOverflowPolicyHandler = createPostEnqueueOverflowPolicyHandler(overflowPolicy); + if (overflowPolicy == OverflowPolicy.REJECT) { + _rejectPolicyHandler = new RejectPolicyHandler(this); _rejectPolicyHandler.onQueueOpen(); } updateAlertChecks(); } - private void createOverflowPolicyHandler(final OverflowPolicy overflowPolicy) + private OverflowPolicyHandler createPostEnqueueOverflowPolicyHandler(final OverflowPolicy overflowPolicy) { - RejectPolicyHandler rejectPolicyHandler = null; OverflowPolicyHandler overflowPolicyHandler; switch (overflowPolicy) { @@ -601,15 +603,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> break; case REJECT: overflowPolicyHandler = new NoneOverflowPolicyHandler(); - rejectPolicyHandler = new RejectPolicyHandler(this); break; default: throw new IllegalStateException(String.format("Overflow policy '%s' is not implemented", overflowPolicy.name())); } - _rejectPolicyHandler = rejectPolicyHandler; - _postEnqueueOverflowPolicyHandler = overflowPolicyHandler; + return overflowPolicyHandler; } protected LogMessage getCreatedLogMessage() @@ -3091,15 +3091,29 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } return super.changeAttribute(name, desired); - } - @SuppressWarnings("ignore") - private void postSetOverflowPolicy() + @Override + protected void changeAttributes(final Map<String, Object> attributes) { - createOverflowPolicyHandler(getOverflowPolicy()); - if (getState() == State.ACTIVE) + OverflowPolicy existingPolicy = getOverflowPolicy(); + super.changeAttributes(attributes); + + // Overflow policies depend on queue depth attributes. + // Thus, we need to create and invoke overflow policy handler + // after all required attributes are changed. + if (attributes.containsKey(OVERFLOW_POLICY) && existingPolicy != _overflowPolicy) { + if (existingPolicy == OverflowPolicy.REJECT) + { + _rejectPolicyHandler = null; + } + _postEnqueueOverflowPolicyHandler = createPostEnqueueOverflowPolicyHandler(_overflowPolicy); + if (_overflowPolicy == OverflowPolicy.REJECT) + { + _rejectPolicyHandler = new RejectPolicyHandler(this); + } + _postEnqueueOverflowPolicyHandler.checkOverflow(null); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bebb9f/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java index 63030cd..9a6b354 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java @@ -43,7 +43,7 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler } - private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler + private static class Handler extends AbstractConfigurationChangeListener { private final Queue<?> _queue; private boolean _limitsChanged; @@ -53,8 +53,7 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler _queue = queue; } - @Override - public void checkOverflow(final QueueEntry newlyEnqueued) + private void checkOverflow(final QueueEntry newlyEnqueued) { long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/42bebb9f/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java index c2f9800..b2559b5 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java @@ -57,7 +57,7 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH _handler.checkOverflow(newlyEnqueued); } - private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler + private static class Handler extends AbstractConfigurationChangeListener { private final Queue<?> _queue; private final EventLogger _eventLogger; @@ -78,8 +78,7 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH } } - @Override - public void checkOverflow(final QueueEntry newlyEnqueued) + private void checkOverflow(final QueueEntry newlyEnqueued) { long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes(); long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org