Author: rajdavies
Date: Fri May 16 02:21:44 2008
New Revision: 656980

URL: http://svn.apache.org/viewvc?rev=656980&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1732

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Fri May 16 02:21:44 2008
@@ -477,7 +477,7 @@
                 m = 
ActiveMQMessageTransformation.transformMessage(transformedMessage, 
session.connection);
             }
         }
-        if (session.isClientAcknowledge()) {
+        if (session.isClientAcknowledge() || 
session.isIndividualAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
                 public void execute() throws Exception {
                     session.checkClosed();
@@ -767,7 +767,14 @@
                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
             } else if (session.isClientAcknowledge()) {
                 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-            } else {
+            } else if (session.isIndividualAcknowledge()){
+               MessageAck ack = new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+                session.asyncSendPacket(ack);
+                synchronized(deliveredMessages){
+                       deliveredMessages.remove(md);
+                }
+            }
+            else {
                 throw new IllegalStateException("Invalid session state.");
             }
         }
@@ -968,7 +975,7 @@
                                 }
                                 afterMessageIsConsumed(md, expired);
                             } catch (RuntimeException e) {
-                                if (session.isDupsOkAcknowledge() || 
session.isAutoAcknowledge()) {
+                                if (session.isDupsOkAcknowledge() || 
session.isAutoAcknowledge() || session.isIndividualAcknowledge()) {
                                     // Redeliver the message
                                 } else {
                                     // Transacted or Client ack: Deliver the

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=656980&r1=656979&r2=656980&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 Fri May 16 02:21:44 2008
@@ -132,6 +132,8 @@
  * @see javax.jms.XASession
  */
 public class ActiveMQSession implements Session, QueueSession, TopicSession, 
StatsCapable, ActiveMQDispatcher {
+       
+       public static final int INDIVIDUAL_ACKNOWLEDGE=4;
 
     public static interface DeliveryListener {
         void beforeDelivery(ActiveMQSession session, Message msg);
@@ -710,7 +712,7 @@
                 continue;
             }
 
-            if (isClientAcknowledge()) {
+            if (isClientAcknowledge()||isIndividualAcknowledge()) {
                 message.setAcknowledgeCallback(new Callback() {
                     public void execute() throws Exception {
                     }
@@ -1705,6 +1707,10 @@
     public boolean isDupsOkAcknowledge() {
         return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
     }
+    
+    public boolean isIndividualAcknowledge(){
+       return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
+    }
 
     /**
      * Returns the message delivery listener.


Reply via email to