Author: kwall
Date: Mon Dec 21 15:46:45 2015
New Revision: 1721198
URL: http://svn.apache.org/viewvc?rev=1721198&view=rev
Log:
QPID-6951: Revert change to BasicMessageConsumer so that prefetched messages
are not released if a session is already closing.
* Added test to ensure that previously received messages are committed even if
the consumer is closed.
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1721198&r1=1721197&r2=1721198&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Dec 21 15:46:45 2015
@@ -652,7 +652,7 @@ public abstract class BasicMessageConsum
}
- if(!isBrowseOnly())
+ if(!(isBrowseOnly() || getSession().isClosing()))
{
releasePendingMessages();
}
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java?rev=1721198&r1=1721197&r2=1721198&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Mon Dec 21 15:46:45 2015
@@ -37,6 +37,38 @@ public class MessageConsumerCloseTest e
{
private volatile Exception _exception;
+ /**
+ * JMS Session says "The content of a transaction's input and output units
is simply those messages that have
+ * been produced and consumed within the session's current transaction.".
Closing a consumer must not therefore
+ * prevent previously received messages from being committed.
+ */
+ public void testConsumerCloseAndSessionCommit() throws Exception
+ {
+ Connection connection = getConnection();
+ connection.start();
+ final Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Destination destination = getTestQueue();
+ MessageConsumer consumer1 = session.createConsumer(destination);
+ sendMessage(session, destination, 2);
+
+
+ Message message = consumer1.receive(RECEIVE_TIMEOUT);
+ assertNotNull("First message is not received", message);
+ assertEquals("First message unexpected has unexpected property", 0,
message.getIntProperty(INDEX));
+ consumer1.close();
+
+ session.commit();
+
+ MessageConsumer consumer2 = session.createConsumer(destination);
+ message = consumer2.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Second message is not received", message);
+ assertEquals("Second message unexpected has unexpected property", 1,
message.getIntProperty(INDEX));
+
+ message = consumer2.receive(100l);
+ assertNull("Unexpected third message", message);
+ }
+
+
public void testConsumerCloseAndSessionRollback() throws Exception
{
Connection connection = getConnection();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]