Author: rgodfrey
Date: Sat Oct 20 13:43:04 2012
New Revision: 1400447
URL: http://svn.apache.org/viewvc?rev=1400447&view=rev
Log:
QPID-4383 : Fix receipt of large messages, client ack
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java?rev=1400447&r1=1400446&r2=1400447&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java
Sat Oct 20 13:43:04 2012
@@ -71,6 +71,10 @@ public class Delivery
{
setComplete(true);
}
+ if(Boolean.TRUE.equals(transfer.getSettled()))
+ {
+ setSettled(true);
+ }
}
public List<Transfer> getTransfers()
Modified:
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1400447&r1=1400446&r2=1400447&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
Sat Oct 20 13:43:04 2012
@@ -113,33 +113,37 @@ public class ReceivingLinkEndpoint exten
synchronized (getLock())
{
TransientState transientState;
- boolean existingState =
_unsettledMap.containsKey(transfer.getDeliveryTag());
- _unsettledMap.put(transfer.getDeliveryTag(), transfer.getState());
+ final Binary deliveryTag = delivery.getDeliveryTag();
+ boolean existingState = _unsettledMap.containsKey(deliveryTag);
+ if(!existingState || transfer.getState() != null)
+ {
+ _unsettledMap.put(deliveryTag, transfer.getState());
+ }
if(!existingState)
{
transientState = new TransientState(transfer.getDeliveryId());
- if(Boolean.TRUE.equals(transfer.getSettled()))
+ if(delivery.isSettled())
{
transientState.setSettled(true);
}
- _unsettledIds.put(transfer.getDeliveryTag(), transientState);
+ _unsettledIds.put(deliveryTag, transientState);
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
}
else
{
- transientState = _unsettledIds.get(transfer.getDeliveryTag());
+ transientState = _unsettledIds.get(deliveryTag);
transientState.incrementCredit();
- if(Boolean.TRUE.equals(transfer.getSettled()))
+ if(delivery.isSettled())
{
transientState.setSettled(true);
}
}
- if(transientState.isSettled())
+ if(transientState.isSettled() && delivery.isComplete())
{
- _unsettledMap.remove(transfer.getDeliveryTag());
+ _unsettledMap.remove(deliveryTag);
}
getLinkEventListener().messageTransfer(transfer);
@@ -371,7 +375,7 @@ public class ReceivingLinkEndpoint exten
tag = iter.next();
tagsToUpdate.add(tag);
- deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+ deliveryId = _unsettledIds.get(tag).getDeliveryId();
if(deliveryId.equals(last.add(UnsignedInteger.ONE)))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]