Author: rajdavies
Date: Fri May 16 11:29:33 2008
New Revision: 657155

URL: http://svn.apache.org/viewvc?rev=657155&view=rev
Log:
tidy up for https://issues.apache.org/activemq/browse/AMQ-1732

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=657155&r1=657154&r2=657155&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Fri May 16 11:29:33 2008
@@ -477,13 +477,20 @@
                 m = 
ActiveMQMessageTransformation.transformMessage(transformedMessage, 
session.connection);
             }
         }
-        if (session.isClientAcknowledge() || 
session.isIndividualAcknowledge()) {
+        if (session.isClientAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
                 public void execute() throws Exception {
                     session.checkClosed();
                     session.acknowledge();
                 }
             });
+        }else if (session.isIndividualAcknowledge()) {
+            m.setAcknowledgeCallback(new Callback() {
+                public void execute() throws Exception {
+                    session.checkClosed();
+                    acknowledge(md);
+                }
+            });
         }
         return m;
     }
@@ -765,15 +772,9 @@
                 }
             } else if (session.isDupsOkAcknowledge()) {
                 ackLater(md, MessageAck.STANDARD_ACK_TYPE);
-            } else if (session.isClientAcknowledge()) {
+            } else if 
(session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                 ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
-            } else if (session.isIndividualAcknowledge()){
-               MessageAck ack = new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
-                session.asyncSendPacket(ack);
-                synchronized(deliveredMessages){
-                       deliveredMessages.remove(md);
-                }
-            }
+            } 
             else {
                 throw new IllegalStateException("Invalid session state.");
             }
@@ -855,6 +856,14 @@
             }
         }
     }
+    
+    void acknowledge(MessageDispatch md) throws JMSException {
+        MessageAck ack = new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+        session.asyncSendPacket(ack);
+        synchronized(deliveredMessages){
+            deliveredMessages.remove(md);
+        }
+    }
 
     public void commit() throws JMSException {
         synchronized (deliveredMessages) {


Reply via email to