Author: orudyy
Date: Mon Dec 21 13:33:03 2015
New Revision: 1721151
URL: http://svn.apache.org/viewvc?rev=1721151&view=rev
Log:
QPID-6951: Release consumer prefetched messages on consumer close regardless
whether session is closed or not. Rename/remove consumer methods to have
sensible method names indicating what exactly method is really doing. Add
system test
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Dec 21 13:33:03 2015
@@ -27,7 +27,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -939,7 +938,7 @@ public abstract class AMQSession<C exten
private void rejectPending(C consumer)
{
// Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
+ consumer.releasePendingMessages();
// Reject messages on pre-dispatch queue
rejectMessagesForConsumerTag(consumer.getConsumerTag());
@@ -3373,7 +3372,7 @@ public abstract class AMQSession<C exten
{
if (!consumer.isBrowseOnly())
{
- consumer.rollback();
+ consumer.releasePendingMessages();
}
else
{
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Dec 21 13:33:03 2015
@@ -652,9 +652,9 @@ public abstract class BasicMessageConsum
}
- if(!(isBrowseOnly() || getSession().isClosing()))
+ if(!isBrowseOnly())
{
- rollback();
+ releasePendingMessages();
}
}
}
@@ -887,12 +887,7 @@ public abstract class BasicMessageConsum
return _browseOnly;
}
- public void rollback()
- {
- rollbackPendingMessages();
- }
-
- public void rollbackPendingMessages()
+ void releasePendingMessages()
{
if (_synchronousQueue.size() > 0)
{
@@ -942,7 +937,7 @@ public abstract class BasicMessageConsum
if (_synchronousQueue.size() != 0)
{
_logger.warn("Queue was not empty after rejecting all messages
Remaining:" + _synchronousQueue.size());
- rollback();
+ releasePendingMessages();
}
clearReceiveQueue();
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Dec 21 13:33:03 2015
@@ -454,7 +454,7 @@ public class BasicMessageConsumer_0_10 e
return receiveNoWait();
}
- @Override public void rollbackPendingMessages()
+ @Override void releasePendingMessages()
{
if (getSynchronousQueue().size() > 0)
{
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Mon Dec 21 13:33:03 2015
@@ -108,4 +108,39 @@ public class MessageConsumerCloseTest e
assertEquals("Message three has unexpected content", 2,
msg3.getIntProperty(INDEX));
session.commit();
}
+
+ public void
testMessagesReceivedBeforeConsumerCloseAreRedeliveredAfterRollback() throws
Exception
+ {
+ Connection connection = getConnection();
+ final Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+
+ Destination destination = getTestQueue();
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ int messageNumber = 4;
+ connection.start();
+ sendMessage(session, destination, messageNumber);
+
+ for(int i = 0; i < messageNumber/2 ; i++)
+ {
+ Message message = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message [" + i +"] was null", message);
+ assertEquals("Message [" + i +"] has unexpected content", i,
message.getIntProperty(INDEX));
+ }
+
+ consumer.close();
+
+ session.rollback();
+
+ MessageConsumer consumer2 = session.createConsumer(destination);
+
+ for(int i = 0; i < messageNumber ; i++)
+ {
+ Message message = consumer2.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message [" + i +"] was null", message);
+ assertEquals("Message [" + i +"] has unexpected content", i,
message.getIntProperty(INDEX));
+ }
+
+ session.commit();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]