Author: rgodfrey
Date: Sat Oct 20 13:43:04 2012
New Revision: 1400447

URL: http://svn.apache.org/viewvc?rev=1400447&view=rev
Log:
QPID-4383 : Fix receipt of large messages, client ack

Modified:
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
    
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java?rev=1400447&r1=1400446&r2=1400447&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
 Sat Oct 20 13:43:04 2012
@@ -71,6 +71,10 @@ public class Delivery
         {
             setComplete(true);
         }
+        if(Boolean.TRUE.equals(transfer.getSettled()))
+        {
+            setSettled(true);
+        }
     }
 
     public List<Transfer> getTransfers()

Modified: 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1400447&r1=1400446&r2=1400447&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
 Sat Oct 20 13:43:04 2012
@@ -113,33 +113,37 @@ public class ReceivingLinkEndpoint exten
         synchronized (getLock())
         {
             TransientState transientState;
-            boolean existingState = 
_unsettledMap.containsKey(transfer.getDeliveryTag());
-            _unsettledMap.put(transfer.getDeliveryTag(), transfer.getState());
+            final Binary deliveryTag = delivery.getDeliveryTag();
+            boolean existingState = _unsettledMap.containsKey(deliveryTag);
+            if(!existingState || transfer.getState() != null)
+            {
+                _unsettledMap.put(deliveryTag, transfer.getState());
+            }
             if(!existingState)
             {
                 transientState = new TransientState(transfer.getDeliveryId());
-                if(Boolean.TRUE.equals(transfer.getSettled()))
+                if(delivery.isSettled())
                 {
                     transientState.setSettled(true);
                 }
-                _unsettledIds.put(transfer.getDeliveryTag(), transientState);
+                _unsettledIds.put(deliveryTag, transientState);
                 setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
                 setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
 
             }
             else
             {
-                transientState = _unsettledIds.get(transfer.getDeliveryTag());
+                transientState = _unsettledIds.get(deliveryTag);
                 transientState.incrementCredit();
-                if(Boolean.TRUE.equals(transfer.getSettled()))
+                if(delivery.isSettled())
                 {
                     transientState.setSettled(true);
                 }
             }
 
-            if(transientState.isSettled())
+            if(transientState.isSettled() && delivery.isComplete())
             {
-                _unsettledMap.remove(transfer.getDeliveryTag());
+                _unsettledMap.remove(deliveryTag);
             }
             getLinkEventListener().messageTransfer(transfer);
 
@@ -371,7 +375,7 @@ public class ReceivingLinkEndpoint exten
                     tag = iter.next();
                     tagsToUpdate.add(tag);
 
-                    deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+                    deliveryId = _unsettledIds.get(tag).getDeliveryId();
 
                     if(deliveryId.equals(last.add(UnsignedInteger.ONE)))
                     {



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to