Author: kwall
Date: Thu Feb 12 12:00:06 2015
New Revision: 1659232
URL: http://svn.apache.org/r1659232
Log:
broswer consumer close is now pulled by IO rather than pushed by queue, fixing
browser tests
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
Thu Feb 12 12:00:06 2015
@@ -54,14 +54,18 @@ public abstract class AbstractConsumerTa
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
while(hasMessagesToSend())
{
sendNextMessage();
}
+
+ processClosed();
}
+ protected abstract void processClosed();
+
@Override
public final boolean isSuspended()
{
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
Thu Feb 12 12:00:06 2015
@@ -33,7 +33,7 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State>
listener);
- void processPendingMessages();
+ void processPending();
enum State
{
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
Thu Feb 12 12:00:06 2015
@@ -2141,7 +2141,8 @@ public abstract class AbstractQueue<X ex
if (consumerDone)
{
sub.flushBatched();
- if (lastLoop && !sub.isSuspended())
+ boolean noMore = getNextAvailableEntry(sub) ==
null;
+ if (lastLoop && noMore)
{
sub.queueEmpty();
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
Thu Feb 12 12:00:06 2015
@@ -249,7 +249,7 @@ public class MockConsumer implements Con
}
@Override
- public void processPendingMessages()
+ public void processPending()
{
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
Thu Feb 12 12:00:06 2015
@@ -661,4 +661,10 @@ public class ConsumerTarget_0_10 extends
{
return _unacknowledgedCount.longValue();
}
+
+ @Override
+ protected void processClosed()
+ {
+
+ }
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
Thu Feb 12 12:00:06 2015
@@ -1140,7 +1140,7 @@ public class ServerSession extends Sessi
{
for(ConsumerTarget target : getSubscriptions())
{
- target.processPendingMessages();
+ target.processPending();
}
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Thu Feb 12 12:00:06 2015
@@ -3613,7 +3613,7 @@ public class AMQChannel
for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
{
- target.processPendingMessages();
+ target.processPending();
}
}
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
Thu Feb 12 12:00:06 2015
@@ -75,6 +75,7 @@ public abstract class ConsumerTarget_0_8
private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
private final List<ConsumerImpl> _consumers = new CopyOnWriteArrayList<>();
+ private final AtomicBoolean _needToClose = new AtomicBoolean();
public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
@@ -513,6 +514,15 @@ public abstract class ConsumerTarget_0_8
{
if (isAutoClose())
{
+ _needToClose.set(true);
+ }
+ }
+
+ @Override
+ protected void processClosed()
+ {
+ if (_needToClose.get() && getState() != State.CLOSED)
+ {
close();
confirmAutoClose();
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Thu Feb 12 12:00:06 2015
@@ -535,4 +535,9 @@ class ConsumerTarget_1_0 extends Abstrac
return 0;
}
+ @Override
+ protected void processClosed()
+ {
+
+ }
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Thu Feb 12 12:00:06 2015
@@ -905,7 +905,7 @@ public class Session_1_0 implements Sess
for(Consumer<?> consumer : getConsumers())
{
- ((ConsumerImpl)consumer).getTarget().processPendingMessages();
+ ((ConsumerImpl)consumer).getTarget().processPending();
}
}
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java?rev=1659232&r1=1659231&r2=1659232&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
Thu Feb 12 12:00:06 2015
@@ -148,7 +148,6 @@ public class QueueBrowserAutoAckTest ext
assertEquals("Session reports Queue expectedDepth not as expected",
expectedDepth, queueDepth);
-
// Browse the queue to get a second opinion
int msgCount = 0;
Enumeration msgs = queueBrowser.getEnumeration();
@@ -268,7 +267,7 @@ public class QueueBrowserAutoAckTest ext
//validate all browsers get right message count.
for (int count = 0; count < browserEnumerationCount; count++)
{
- assertEquals(msgCount[count], expectedMessages);
+ assertEquals("Unexpected count for browser " + count,
expectedMessages, msgCount[count]);
}
try
@@ -317,7 +316,7 @@ public class QueueBrowserAutoAckTest ext
//Close this new connection
connection.close();
- _logger.info("All messages recevied from queue");
+ _logger.info("All messages received from queue");
//ensure no message left.
checkQueueDepth(0);
@@ -344,7 +343,7 @@ public class QueueBrowserAutoAckTest ext
/*
* Test Messages Remain on Queue
- * Create a queu and send messages to it. Browse them and then receive them
all to verify they were still there
+ * Create a queue and send messages to it. Browse them and then receive
them all to verify they were still there
*
*/
public void testQueueBrowserMsgsRemainOnQueue() throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]