Author: rajdavies
Date: Fri May 16 02:21:44 2008
New Revision: 656980
URL: http://svn.apache.org/viewvc?rev=656980&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1732
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Fri May 16 02:21:44 2008
@@ -477,7 +477,7 @@
m =
ActiveMQMessageTransformation.transformMessage(transformedMessage,
session.connection);
}
}
- if (session.isClientAcknowledge()) {
+ if (session.isClientAcknowledge() ||
session.isIndividualAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
session.checkClosed();
@@ -767,7 +767,14 @@
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
- } else {
+ } else if (session.isIndividualAcknowledge()){
+ MessageAck ack = new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+ session.asyncSendPacket(ack);
+ synchronized(deliveredMessages){
+ deliveredMessages.remove(md);
+ }
+ }
+ else {
throw new IllegalStateException("Invalid session state.");
}
}
@@ -968,7 +975,7 @@
}
afterMessageIsConsumed(md, expired);
} catch (RuntimeException e) {
- if (session.isDupsOkAcknowledge() ||
session.isAutoAcknowledge()) {
+ if (session.isDupsOkAcknowledge() ||
session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
// Redeliver the message
} else {
// Transacted or Client ack: Deliver the
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Fri May 16 02:21:44 2008
@@ -132,6 +132,8 @@
* @see javax.jms.XASession
*/
public class ActiveMQSession implements Session, QueueSession, TopicSession,
StatsCapable, ActiveMQDispatcher {
+
+ public static final int INDIVIDUAL_ACKNOWLEDGE=4;
public static interface DeliveryListener {
void beforeDelivery(ActiveMQSession session, Message msg);
@@ -710,7 +712,7 @@
continue;
}
- if (isClientAcknowledge()) {
+ if (isClientAcknowledge()||isIndividualAcknowledge()) {
message.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
}
@@ -1705,6 +1707,10 @@
public boolean isDupsOkAcknowledge() {
return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
}
+
+ public boolean isIndividualAcknowledge(){
+ return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
+ }
/**
* Returns the message delivery listener.