Author: gtully
Date: Thu Oct 16 09:48:52 2008
New Revision: 705281

URL: http://svn.apache.org/viewvc?rev=705281&view=rev
Log:
AMQ-1976, individual messages passing through a network bridge need an 
individual ack; for an individual ack, it is ok to have more than one message 
dispatched

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Thu Oct 16 09:48:52 2008
@@ -411,40 +411,44 @@
      * @param lastAckedMsg
      * @throws JMSException if it does not match
      */
-       protected void assertAckMatchesDispatched(MessageAck ack)
-                       throws JMSException {
+       protected void assertAckMatchesDispatched(MessageAck ack) throws 
JMSException {
         MessageId firstAckedMsg = ack.getFirstMessageId();
-               MessageId lastAckedMsg = ack.getLastMessageId();
+        MessageId lastAckedMsg = ack.getLastMessageId();
+        int checkCount = 0;
+        boolean checkFoundStart = false;
+        boolean checkFoundEnd = false;
+        for (MessageReference node : dispatched) {
+
+            if (firstAckedMsg == null) {
+                checkFoundStart = true;
+            } else if (!checkFoundStart && 
firstAckedMsg.equals(node.getMessageId())) {
+                checkFoundStart = true;
+            }
+
+            if (checkFoundStart) {
+                checkCount++;
+            }
 
-               int checkCount = 0;
-               boolean checkFoundStart = false;
-               boolean checkFoundEnd = false;
-               for (MessageReference node : dispatched) {
-                       
-                       if( firstAckedMsg == null ) {
-                               checkFoundStart=true;
-                       } else if (!checkFoundStart && 
firstAckedMsg.equals(node.getMessageId())) {
-                               checkFoundStart = true;
-                       }
-
-                       if (checkFoundStart) {
-                               checkCount++;
-                       }
-
-                       if (lastAckedMsg != null && 
lastAckedMsg.equals(node.getMessageId())) {
-                               checkFoundEnd = true;
-                               break;
-                       }
-               }
-               if (!checkFoundStart && firstAckedMsg != null)
-                       throw new JMSException("Unmatched acknowledege: Could 
not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
-               if (!checkFoundEnd && lastAckedMsg != null)
-                       throw new JMSException("Unmatched acknowledege: Could 
not find Message-ID "+lastAckedMsg+" in dispatched-list (end of ack)");
-               if (ack.getMessageCount() != checkCount) {
-                       throw new JMSException("Unmatched acknowledege: 
Expected message count ("+ack.getMessageCount()+
-                                       ") differs from count in 
dispatched-list ("+checkCount+")");
-               }
-       }
+            if (lastAckedMsg != null && 
lastAckedMsg.equals(node.getMessageId())) {
+                checkFoundEnd = true;
+                break;
+            }
+        }
+        if (!checkFoundStart && firstAckedMsg != null)
+            throw new JMSException("Unmatched acknowledege: " + ack
+                    + "; Could not find Message-ID " + firstAckedMsg
+                    + " in dispatched-list (start of ack)");
+        if (!checkFoundEnd && lastAckedMsg != null)
+            throw new JMSException("Unmatched acknowledege: " + ack
+                    + "; Could not find Message-ID " + lastAckedMsg
+                    + " in dispatched-list (end of ack)");
+        if (ack.getMessageCount() != checkCount && ack.isStandardAck()) {
+            throw new JMSException("Unmatched acknowledege: " + ack
+                    + "; Expected message count (" + ack.getMessageCount()
+                    + ") differs from count in dispatched-list (" + checkCount
+                    + ")");
+        }
+    }
 
     /**
      * @param context

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Thu Oct 16 09:48:52 2008
@@ -647,7 +647,7 @@
                             else{
                               LOG.info("Message not forwarded on to remote, 
because message came from remote");                               
                             }
-                            localBroker.oneway(new MessageAck(md, 
MessageAck.STANDARD_ACK_TYPE, 1));
+                            localBroker.oneway(new MessageAck(md, 
MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                             dequeueCounter.incrementAndGet();                  
        
 
                         } else {
@@ -664,7 +664,7 @@
                                             ExceptionResponse er = 
(ExceptionResponse)response;
                                             
serviceLocalException(er.getException());
                                         } else {
-                                            localBroker.oneway(new 
MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
+                                            localBroker.oneway(new 
MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                             dequeueCounter.incrementAndGet();
                                         }
                                     } catch (IOException e) {

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
 Thu Oct 16 09:48:52 2008
@@ -39,7 +39,6 @@
     public static final int MESSAGE_COUNT = 2000;
    
     public void testQueueNetworkWithConsumerFull() throws Exception {
-        if (true) return;
         
         bridgeAllBrokers();
         startAllBrokers();
@@ -67,8 +66,6 @@
         assertTrue("All messages are consumed and acked from source:" + 
internalQueue, internalQueue.getMessages().isEmpty());
         assertEquals("messages source:" + internalQueue, 0, 
internalQueue.getDestinationStatistics().getMessages().getCount());
         assertEquals("inflight source:" + internalQueue, 0, 
internalQueue.getDestinationStatistics().getInflight().getCount());
-        
-
     }
 
     public void setUp() throws Exception {
@@ -82,7 +79,6 @@
         }
         BrokerService broker2 = brokers.get("Broker2").broker;
         applyMemoryLimitPolicy(broker2);
-        
     }
 
     private void applyMemoryLimitPolicy(BrokerService broker) {


Reply via email to