Author: kwall
Date: Thu Feb 12 12:00:06 2015
New Revision: 1659232

URL: http://svn.apache.org/r1659232
Log:
broswer consumer close is now pulled by IO rather than pushed by queue, fixing 
browser tests

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
 Thu Feb 12 12:00:06 2015
@@ -54,14 +54,18 @@ public abstract class AbstractConsumerTa
     }
 
     @Override
-    public void processPendingMessages()
+    public void processPending()
     {
         while(hasMessagesToSend())
         {
             sendNextMessage();
         }
+
+        processClosed();
     }
 
+    protected abstract void processClosed();
+
     @Override
     public final boolean isSuspended()
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
 Thu Feb 12 12:00:06 2015
@@ -33,7 +33,7 @@ public interface ConsumerTarget
 
     void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> 
listener);
 
-    void processPendingMessages();
+    void processPending();
 
     enum State
     {

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
 Thu Feb 12 12:00:06 2015
@@ -2141,7 +2141,8 @@ public abstract class AbstractQueue<X ex
                             if (consumerDone)
                             {
                                 sub.flushBatched();
-                                if (lastLoop && !sub.isSuspended())
+                                boolean noMore = getNextAvailableEntry(sub) == 
null;
+                                if (lastLoop && noMore)
                                 {
                                     sub.queueEmpty();
                                 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
 Thu Feb 12 12:00:06 2015
@@ -249,7 +249,7 @@ public class MockConsumer implements Con
     }
 
     @Override
-    public void processPendingMessages()
+    public void processPending()
     {
 
     }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 Thu Feb 12 12:00:06 2015
@@ -661,4 +661,10 @@ public class ConsumerTarget_0_10 extends
     {
         return _unacknowledgedCount.longValue();
     }
+
+    @Override
+    protected void processClosed()
+    {
+
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 Thu Feb 12 12:00:06 2015
@@ -1140,7 +1140,7 @@ public class ServerSession extends Sessi
     {
         for(ConsumerTarget target : getSubscriptions())
         {
-            target.processPendingMessages();
+            target.processPending();
         }
     }
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Thu Feb 12 12:00:06 2015
@@ -3613,7 +3613,7 @@ public class AMQChannel
 
         for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
         {
-            target.processPendingMessages();
+            target.processPending();
         }
     }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 Thu Feb 12 12:00:06 2015
@@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8
     private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
     private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
     private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+    private final AtomicBoolean _needToClose = new AtomicBoolean();
 
 
     public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -513,6 +514,15 @@ public abstract class ConsumerTarget_0_8
     {
         if (isAutoClose())
         {
+            _needToClose.set(true);
+        }
+    }
+
+    @Override
+    protected void processClosed()
+    {
+        if (_needToClose.get() && getState() != State.CLOSED)
+        {
             close();
             confirmAutoClose();
         }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Thu Feb 12 12:00:06 2015
@@ -535,4 +535,9 @@ class ConsumerTarget_1_0 extends Abstrac
         return 0;
     }
 
+    @Override
+    protected void processClosed()
+    {
+
+    }
 }

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 Thu Feb 12 12:00:06 2015
@@ -905,7 +905,7 @@ public class Session_1_0 implements Sess
         for(Consumer<?> consumer : getConsumers())
         {
 
-            ((ConsumerImpl)consumer).getTarget().processPendingMessages();
+            ((ConsumerImpl)consumer).getTarget().processPending();
         }
     }
 

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
 Thu Feb 12 12:00:06 2015
@@ -148,7 +148,6 @@ public class QueueBrowserAutoAckTest ext
         assertEquals("Session reports Queue expectedDepth not as expected", 
expectedDepth, queueDepth);
 
 
-
         // Browse the queue to get a second opinion
         int msgCount = 0;
         Enumeration msgs = queueBrowser.getEnumeration();
@@ -268,7 +267,7 @@ public class QueueBrowserAutoAckTest ext
         //validate all browsers get right message count.
         for (int count = 0; count < browserEnumerationCount; count++)
         {
-            assertEquals(msgCount[count], expectedMessages);
+            assertEquals("Unexpected count for browser " + count, 
expectedMessages, msgCount[count]);
         }
 
         try
@@ -317,7 +316,7 @@ public class QueueBrowserAutoAckTest ext
         //Close this new connection
         connection.close();
 
-        _logger.info("All messages recevied from queue");
+        _logger.info("All messages received from queue");
 
         //ensure no message left.
         checkQueueDepth(0);
@@ -344,7 +343,7 @@ public class QueueBrowserAutoAckTest ext
 
     /*
     * Test Messages Remain on Queue
-    * Create a queu and send messages to it. Browse them and then receive them 
all to verify they were still there
+    * Create a queue and send messages to it. Browse them and then receive 
them all to verify they were still there
     *
     */
     public void testQueueBrowserMsgsRemainOnQueue() throws Exception



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to