QPID-7971: [Qpid Broker-J] Ensure that consumers are notified no-work during consumer target close
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d1b74e42 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d1b74e42 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d1b74e42 Branch: refs/heads/master Commit: d1b74e424d2bf71004357cf7b3e968b68b1aed10 Parents: 2acc601 Author: Keith Wall <kw...@apache.org> Authored: Tue Oct 24 10:40:21 2017 +0100 Committer: Keith Wall <kw...@apache.org> Committed: Tue Oct 24 13:17:54 2017 +0100 ---------------------------------------------------------------------- .../server/consumer/AbstractConsumerTarget.java | 51 ++++++------------ .../consumer/AbstractConsumerTargetTest.java | 54 ++++++++++++++++++-- 2 files changed, 67 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java index 2ef82cf..76d4652 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java +++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java @@ -51,14 +51,7 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T> { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class); - private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject() - { - @Override - public String toLogString() - { - return "[(** Multi-Queue **)] "; - } - }; + private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = () -> "[(** Multi-Queue **)] "; protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0); protected final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN); @@ -66,11 +59,11 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T> private final boolean _isMultiQueue; private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker; private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<>(); - - private Iterator<MessageInstanceConsumer> _pullIterator; - private boolean _notifyWorkDesired; private final AtomicBoolean _scheduled = new AtomicBoolean(); + private volatile Iterator<MessageInstanceConsumer> _pullIterator; + private volatile boolean _notifyWorkDesired; + protected AbstractConsumerTarget(final boolean isMultiQueue, final AMQPConnection<?> amqpConnection) { @@ -122,17 +115,14 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T> { if (desired != _notifyWorkDesired) { - if(_suspendedConsumerLoggingTicker != null) + if (desired) { - if (desired) - { - getSession().removeTicker(_suspendedConsumerLoggingTicker); - } - else - { - _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis()); - getSession().addTicker(_suspendedConsumerLoggingTicker); - } + getSession().removeTicker(_suspendedConsumerLoggingTicker); + } + else + { + _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis()); + getSession().addTicker(_suspendedConsumerLoggingTicker); } for (MessageInstanceConsumer consumer : _consumers) @@ -174,14 +164,7 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T> if(_consumers.contains(sub)) { return doOnIoThreadAsync( - new Runnable() - { - @Override - public void run() - { - consumerRemovedInternal(sub); - } - }); + () -> consumerRemovedInternal(sub)); } else { @@ -355,19 +338,17 @@ public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T> { if (_state.compareAndSet(State.OPEN, State.CLOSED)) { + setNotifyWorkDesired(false); + List<MessageInstanceConsumer> consumers = new ArrayList<>(_consumers); _consumers.clear(); - setNotifyWorkDesired(false); - for (MessageInstanceConsumer consumer : consumers) { consumer.close(); } - if (_suspendedConsumerLoggingTicker != null) - { - getSession().removeTicker(_suspendedConsumerLoggingTicker); - } + + getSession().removeTicker(_suspendedConsumerLoggingTicker); return true; } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java ---------------------------------------------------------------------- diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java index e29458a..8e41a9c 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java @@ -22,11 +22,16 @@ package org.apache.qpid.server.consumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import org.mockito.InOrder; + import org.apache.qpid.server.message.MessageContainer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageInstanceConsumer; @@ -49,7 +54,8 @@ public class AbstractConsumerTargetTest extends QpidTestCase private TestAbstractConsumerTarget _consumerTarget; private Consumer _consumer; private MessageSource _messageSource; - private AMQPConnection _connection = mock(AMQPConnection.class); + private AMQPConnection<?> _connection = mock(AMQPConnection.class); + private AMQPSession<?,TestAbstractConsumerTarget> _session = mock(AMQPSession.class); private MessageInstance _messageInstance; @Override @@ -71,6 +77,48 @@ public class AbstractConsumerTargetTest extends QpidTestCase _consumerTarget.consumerAdded(_consumer); } + public void testClose() throws Exception + { + _consumerTarget = new TestAbstractConsumerTarget(); + assertEquals("Unexpected number of consumers", 0, _consumerTarget.getConsumers().size()); + + _consumerTarget.consumerAdded(_consumer); + assertEquals("Unexpected number of consumers after add", 1, _consumerTarget.getConsumers().size()); + + _consumerTarget.close(); + assertEquals("Unexpected number of consumers after close", 0, _consumerTarget.getConsumers().size()); + + verify(_consumer, times(1)).close(); + } + + public void testNotifyWork() throws Exception + { + InOrder order = inOrder(_consumer); + + _consumerTarget = new TestAbstractConsumerTarget(); + assertEquals("Unexpected number of consumers", 0, _consumerTarget.getConsumers().size()); + + _consumerTarget.consumerAdded(_consumer); + + _consumerTarget.setNotifyWorkDesired(true); + order.verify(_consumer, times(1)).setNotifyWorkDesired(true); + + _consumerTarget.setNotifyWorkDesired(false); + order.verify(_consumer, times(1)).setNotifyWorkDesired(false); + + _consumerTarget.setNotifyWorkDesired(true); + order.verify(_consumer, times(1)).setNotifyWorkDesired(true); + + _consumerTarget.setNotifyWorkDesired(true); + // no change of state - should not be propagated to the consumer + + _consumerTarget.close(); + order.verify(_consumer, times(1)).setNotifyWorkDesired(false); + order.verify(_consumer, times(1)).close(); + + verifyNoMoreInteractions(_consumer); + } + public void testConversionExceptionPolicyClose() throws Exception { configureBehaviour(true, MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE); @@ -199,13 +247,13 @@ public class AbstractConsumerTargetTest extends QpidTestCase @Override public void updateNotifyWorkDesired() { - + throw new UnsupportedOperationException(); } @Override public AMQPSession<?, TestAbstractConsumerTarget> getSession() { - return null; + return _session; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org