Author: dejanb
Date: Thu Nov 26 16:46:38 2009
New Revision: 884633

URL: http://svn.apache.org/viewvc?rev=884633&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2515 - Optimized Acknowledgements 
and interrupted transport

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=884633&r1=884632&r2=884633&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Thu Nov 26 16:46:38 2009
@@ -134,6 +134,9 @@
     private long lastDeliveredSequenceId;
 
     private IOException failureError;
+    
+    private long optimizeAckTimestamp = System.currentTimeMillis();
+    private long optimizeAckTimeout = 300;
 
     /**
      * Create a MessageConsumer
@@ -788,7 +791,7 @@
             }
         }
     }
-
+    
     private void afterMessageIsConsumed(MessageDispatch md, boolean 
messageExpired) throws JMSException {
         if (unconsumedMessages.isClosed()) {
             return;
@@ -809,12 +812,13 @@
                         if (!deliveredMessages.isEmpty()) {
                             if (optimizeAcknowledge) {
                                 ackCounter++;
-                                if (ackCounter >= 
(info.getCurrentPrefetchSize() * .65)) {
+                                if (ackCounter >= (info.getPrefetchSize() * 
.65) || System.currentTimeMillis() >= (optimizeAckTimestamp + 
optimizeAckTimeout)) {
                                        MessageAck ack = 
makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                        if (ack != null) {
                                            deliveredMessages.clear();
                                            ackCounter = 0;
                                            session.sendAck(ack);
+                                           optimizeAckTimestamp = 
System.currentTimeMillis();
                                        }
                                 }
                             } else {
@@ -1074,14 +1078,13 @@
                             session.connection.rollbackDuplicate(this, 
old.getMessage());
                         }
                     }
-                    if (pendingAck != null && pendingAck.isDeliveredAck()) {
-                        // on resumption a pending delivered ack will be out 
of sync with
-                        // re deliveries.
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("removing pending delivered ack on 
transport interupt: " + pendingAck);
-                        }   
-                        pendingAck = null;
+                    if (!session.isTransacted()) {
+                        // clean, so we don't have duplicates with 
optimizeAcknowledge 
+                        synchronized (deliveredMessages) {
+                            deliveredMessages.clear();
+                        }
                     }
+                    pendingAck = null;
                 }
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || 
!session.connection.isDuplicate(this, md.getMessage())) {


Reply via email to