Author: rgodfrey
Date: Wed Jun 27 23:13:38 2012
New Revision: 1354771
URL: http://svn.apache.org/viewvc?rev=1354771&view=rev
Log:
NO-JIRA : [Proton-j] fixes to flow control
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1354771&r1=1354770&r2=1354771&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
Wed Jun 27 23:13:38 2012
@@ -352,6 +352,7 @@ public class TransportImpl extends Endpo
if(delivery.getLink().current() != delivery)
{
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.ONE));
+
transportLink.setLinkCredit(transportLink.getLinkCredit().subtract(UnsignedInteger.ONE));
}
delivery.getLink().decrementQueued();
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1354771&r1=1354770&r2=1354771&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
Wed Jun 27 23:13:38 2012
@@ -146,4 +146,14 @@ class TransportLink<T extends LinkImpl>
{
_remoteLinkCredit = remoteLinkCredit;
}
+
+ void decrementLinkCredit()
+ {
+ _linkCredit = _linkCredit.subtract(UnsignedInteger.ONE);
+ }
+
+ void incrementDeliveryCount()
+ {
+ _deliveryCount = _deliveryCount.add(UnsignedInteger.ONE);
+ }
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java?rev=1354771&r1=1354770&r2=1354771&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
Wed Jun 27 23:13:38 2012
@@ -40,9 +40,13 @@ class TransportSender extends TransportL
super.handleFlow(flow);
_drain = flow.getDrain();
getLink().setDrain(flow.getDrain());
- UnsignedInteger transferLimit =
flow.getLinkCredit().add(getDeliveryCount());
+ int oldCredit = getLink().getCredit();
+ UnsignedInteger oldLimit = getLinkCredit().add(getDeliveryCount());
+ UnsignedInteger transferLimit =
flow.getLinkCredit().add(flow.getDeliveryCount());
UnsignedInteger linkCredit =
transferLimit.subtract(getDeliveryCount());
- getLink().setCredit(linkCredit.intValue());
+
+ setLinkCredit(linkCredit);
+ getLink().setCredit(transferLimit.subtract(oldLimit).intValue() +
oldCredit);
DeliveryImpl current = getLink().current();
getLink().getConnectionImpl().workUpdate(current);
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1354771&r1=1354770&r2=1354771&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSession.java
Wed Jun 27 23:13:38 2012
@@ -244,6 +244,8 @@ class TransportSession
{
delivery.setComplete();
_incomingWindowSize =
_incomingWindowSize.subtract(UnsignedInteger.ONE);
+ delivery.getLink().getTransportLink().decrementLinkCredit();
+ delivery.getLink().getTransportLink().incrementDeliveryCount();
}
if(Boolean.TRUE == transfer.getSettled())
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]