Author: rgodfrey
Date: Fri Nov 13 11:39:09 2015
New Revision: 1714190

URL: http://svn.apache.org/viewvc?rev=1714190&view=rev
Log:
QPID-6844 : Fix link credit assignment for AMQP 1.0

Modified:
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
 Fri Nov 13 11:39:09 2015
@@ -81,27 +81,18 @@ public class SendingLinkEndpoint extends
         return Role.SENDER;
     }
 
-    public boolean transfer(final Transfer xfr)
+    public boolean transfer(final Transfer xfr, final boolean decrementCredit)
     {
         SessionEndpoint s = getSession();
-        int transferCount;
-        transferCount = _lastDeliveryTag == null ? 1 : 1;
         xfr.setMessageFormat(UnsignedInteger.ZERO);
         synchronized(getLock())
         {
-
-            final int currentCredit = getLinkCredit().intValue() - 
transferCount;
-
-            if(currentCredit < 0)
-            {
-                return false;
-            }
-            else
+            if(decrementCredit)
             {
-                setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
+                setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
             }
 
-            
setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 
transferCount)));
+            
setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1)));
 
             xfr.setHandle(getLocalHandle());
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Fri Nov 13 11:39:09 2015
@@ -265,7 +265,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
                     }
                     
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
-                    getEndpoint().transfer(transfer);
+                    getEndpoint().transfer(transfer, false);
                 }
                 else
                 {
@@ -307,7 +307,8 @@ class ConsumerTarget_1_0 extends Abstrac
             {
                 suspend();
             }
-
+            SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
+            
linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
             return hasCredit;
         }
     }
@@ -324,7 +325,11 @@ class ConsumerTarget_1_0 extends Abstrac
 
     public void restoreCredit(final ServerMessage message)
     {
-        //TODO
+        synchronized (_link.getLock())
+        {
+            final SendingLinkEndpoint endpoint = _link.getEndpoint();
+            
endpoint.setLinkCredit(endpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
+        }
     }
 
     public void queueEmpty()

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1714190&r1=1714189&r2=1714190&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
 Fri Nov 13 11:39:09 2015
@@ -492,7 +492,7 @@ public class SendingLink_1_0 implements
                 xfr.setDeliveryTag(dt);
                 xfr.setState(accepted);
                 xfr.setResume(Boolean.TRUE);
-                getEndpoint().transfer(xfr);
+                getEndpoint().transfer(xfr, true);
             }
 
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to