Author: rgodfrey
Date: Fri Oct 26 08:44:43 2012
New Revision: 1402430
URL: http://svn.apache.org/viewvc?rev=1402430&view=rev
Log:
QPID-4395 : Fix client ack in onMessage in JMS AMQP 1.0 client
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1402430&r1=1402429&r2=1402430&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
Fri Oct 26 08:44:43 2012
@@ -316,9 +316,9 @@ public class MessageConsumerImpl impleme
_lastUnackedMessage = deliveryTag;
}
- void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
throws IllegalStateException
+ void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
{
- final int acknowledgeMode = _session.getAcknowledgeMode();
+ int acknowledgeMode = _session.getAckModeEnum().ordinal();
if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
|| acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1402430&r1=1402429&r2=1402430&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
Fri Oct 26 08:44:43 2012
@@ -804,6 +804,10 @@ public class SessionImpl implements Sess
if(message != null)
{
+ if(_acknowledgeMode ==
AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+ {
+
consumer.setLastUnackedMessage(msg.getDeliveryTag());
+ }
_currentConsumer = consumer;
_currentMessage = msg;
try
@@ -816,11 +820,11 @@ public class SessionImpl implements Sess
_currentMessage = null;
}
- if((_recoveredMessage == null) &&
(_acknowledgeMode == AcknowledgeMode.AUTO_ACKNOWLEDGE
- || _acknowledgeMode ==
AcknowledgeMode.DUPS_OK_ACKNOWLEDGE))
+ if(_recoveredMessage == null)
{
- consumer.acknowledge(msg);
+ consumer.preReceiveAction(msg);
}
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]