Author: rgodfrey
Date: Fri Nov 13 11:39:09 2015
New Revision: 1714190
URL: http://svn.apache.org/viewvc?rev=1714190&view=rev
Log:
QPID-6844 : Fix link credit assignment for AMQP 1.0
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
Fri Nov 13 11:39:09 2015
@@ -81,27 +81,18 @@ public class SendingLinkEndpoint extends
return Role.SENDER;
}
- public boolean transfer(final Transfer xfr)
+ public boolean transfer(final Transfer xfr, final boolean decrementCredit)
{
SessionEndpoint s = getSession();
- int transferCount;
- transferCount = _lastDeliveryTag == null ? 1 : 1;
xfr.setMessageFormat(UnsignedInteger.ZERO);
synchronized(getLock())
{
-
- final int currentCredit = getLinkCredit().intValue() -
transferCount;
-
- if(currentCredit < 0)
- {
- return false;
- }
- else
+ if(decrementCredit)
{
- setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
+ setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
}
-
setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() +
transferCount)));
+
setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1)));
xfr.setHandle(getLocalHandle());
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Fri Nov 13 11:39:09 2015
@@ -265,7 +265,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
- getEndpoint().transfer(transfer);
+ getEndpoint().transfer(transfer, false);
}
else
{
@@ -307,7 +307,8 @@ class ConsumerTarget_1_0 extends Abstrac
{
suspend();
}
-
+ SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
+
linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
return hasCredit;
}
}
@@ -324,7 +325,11 @@ class ConsumerTarget_1_0 extends Abstrac
public void restoreCredit(final ServerMessage message)
{
- //TODO
+ synchronized (_link.getLock())
+ {
+ final SendingLinkEndpoint endpoint = _link.getEndpoint();
+
endpoint.setLinkCredit(endpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
+ }
}
public void queueEmpty()
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
Fri Nov 13 11:39:09 2015
@@ -492,7 +492,7 @@ public class SendingLink_1_0 implements
xfr.setDeliveryTag(dt);
xfr.setState(accepted);
xfr.setResume(Boolean.TRUE);
- getEndpoint().transfer(xfr);
+ getEndpoint().transfer(xfr, true);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]