QPID-7971: [Qpid Broker-J] Ensure that consumers are notified no-work during 
consumer target close


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d1b74e42
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d1b74e42
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d1b74e42

Branch: refs/heads/master
Commit: d1b74e424d2bf71004357cf7b3e968b68b1aed10
Parents: 2acc601
Author: Keith Wall <kw...@apache.org>
Authored: Tue Oct 24 10:40:21 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Oct 24 13:17:54 2017 +0100

----------------------------------------------------------------------
 .../server/consumer/AbstractConsumerTarget.java | 51 ++++++------------
 .../consumer/AbstractConsumerTargetTest.java    | 54 ++++++++++++++++++--
 2 files changed, 67 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 
b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
index 2ef82cf..76d4652 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
@@ -51,14 +51,7 @@ public abstract class AbstractConsumerTarget<T extends 
AbstractConsumerTarget<T>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractConsumerTarget.class);
 
-    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = new LogSubject()
-    {
-        @Override
-        public String toLogString()
-        {
-            return "[(** Multi-Queue **)] ";
-        }
-    };
+    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = () -> "[(** 
Multi-Queue **)] ";
     protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
     protected final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicReference<State> _state = new 
AtomicReference<>(State.OPEN);
@@ -66,11 +59,11 @@ public abstract class AbstractConsumerTarget<T extends 
AbstractConsumerTarget<T>
     private final boolean _isMultiQueue;
     private final SuspendedConsumerLoggingTicker 
_suspendedConsumerLoggingTicker;
     private final List<MessageInstanceConsumer> _consumers = new 
CopyOnWriteArrayList<>();
-
-    private Iterator<MessageInstanceConsumer> _pullIterator;
-    private boolean _notifyWorkDesired;
     private final AtomicBoolean _scheduled = new AtomicBoolean();
 
+    private volatile Iterator<MessageInstanceConsumer> _pullIterator;
+    private volatile boolean _notifyWorkDesired;
+
     protected AbstractConsumerTarget(final boolean isMultiQueue,
                                      final AMQPConnection<?> amqpConnection)
     {
@@ -122,17 +115,14 @@ public abstract class AbstractConsumerTarget<T extends 
AbstractConsumerTarget<T>
     {
         if (desired != _notifyWorkDesired)
         {
-            if(_suspendedConsumerLoggingTicker != null)
+            if (desired)
             {
-                if (desired)
-                {
-                    getSession().removeTicker(_suspendedConsumerLoggingTicker);
-                }
-                else
-                {
-                    
_suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
-                    getSession().addTicker(_suspendedConsumerLoggingTicker);
-                }
+                getSession().removeTicker(_suspendedConsumerLoggingTicker);
+            }
+            else
+            {
+                
_suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+                getSession().addTicker(_suspendedConsumerLoggingTicker);
             }
 
             for (MessageInstanceConsumer consumer : _consumers)
@@ -174,14 +164,7 @@ public abstract class AbstractConsumerTarget<T extends 
AbstractConsumerTarget<T>
         if(_consumers.contains(sub))
         {
             return doOnIoThreadAsync(
-                    new Runnable()
-                    {
-                        @Override
-                        public void run()
-                        {
-                            consumerRemovedInternal(sub);
-                        }
-                    });
+                    () -> consumerRemovedInternal(sub));
         }
         else
         {
@@ -355,19 +338,17 @@ public abstract class AbstractConsumerTarget<T extends 
AbstractConsumerTarget<T>
     {
         if (_state.compareAndSet(State.OPEN, State.CLOSED))
         {
+            setNotifyWorkDesired(false);
+
             List<MessageInstanceConsumer> consumers = new 
ArrayList<>(_consumers);
             _consumers.clear();
 
-            setNotifyWorkDesired(false);
-
             for (MessageInstanceConsumer consumer : consumers)
             {
                 consumer.close();
             }
-            if (_suspendedConsumerLoggingTicker != null)
-            {
-                getSession().removeTicker(_suspendedConsumerLoggingTicker);
-            }
+
+            getSession().removeTicker(_suspendedConsumerLoggingTicker);
 
             return true;
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1b74e42/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index e29458a..8e41a9c 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -22,11 +22,16 @@ package org.apache.qpid.server.consumer;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import org.mockito.InOrder;
+
 import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
@@ -49,7 +54,8 @@ public class AbstractConsumerTargetTest extends QpidTestCase
     private TestAbstractConsumerTarget _consumerTarget;
     private Consumer _consumer;
     private MessageSource _messageSource;
-    private AMQPConnection _connection = mock(AMQPConnection.class);
+    private AMQPConnection<?> _connection = mock(AMQPConnection.class);
+    private AMQPSession<?,TestAbstractConsumerTarget> _session = 
mock(AMQPSession.class);
     private MessageInstance _messageInstance;
 
     @Override
@@ -71,6 +77,48 @@ public class AbstractConsumerTargetTest extends QpidTestCase
         _consumerTarget.consumerAdded(_consumer);
     }
 
+    public void testClose() throws Exception
+    {
+        _consumerTarget = new TestAbstractConsumerTarget();
+        assertEquals("Unexpected number of consumers", 0, 
_consumerTarget.getConsumers().size());
+
+        _consumerTarget.consumerAdded(_consumer);
+        assertEquals("Unexpected number of consumers after add", 1, 
_consumerTarget.getConsumers().size());
+
+        _consumerTarget.close();
+        assertEquals("Unexpected number of consumers after close", 0, 
_consumerTarget.getConsumers().size());
+
+        verify(_consumer, times(1)).close();
+    }
+
+    public void testNotifyWork() throws Exception
+    {
+        InOrder order = inOrder(_consumer);
+
+        _consumerTarget = new TestAbstractConsumerTarget();
+        assertEquals("Unexpected number of consumers", 0, 
_consumerTarget.getConsumers().size());
+
+        _consumerTarget.consumerAdded(_consumer);
+
+        _consumerTarget.setNotifyWorkDesired(true);
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(true);
+
+        _consumerTarget.setNotifyWorkDesired(false);
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(false);
+
+        _consumerTarget.setNotifyWorkDesired(true);
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(true);
+
+        _consumerTarget.setNotifyWorkDesired(true);
+        // no change of state - should not be propagated to the consumer
+
+        _consumerTarget.close();
+        order.verify(_consumer, times(1)).setNotifyWorkDesired(false);
+        order.verify(_consumer, times(1)).close();
+
+        verifyNoMoreInteractions(_consumer);
+    }
+
     public void testConversionExceptionPolicyClose() throws Exception
     {
         configureBehaviour(true, 
MessageSource.MessageConversionExceptionHandlingPolicy.CLOSE);
@@ -199,13 +247,13 @@ public class AbstractConsumerTargetTest extends 
QpidTestCase
         @Override
         public void updateNotifyWorkDesired()
         {
-
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public AMQPSession<?, TestAbstractConsumerTarget> getSession()
         {
-            return null;
+            return _session;
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to