Author: rgodfrey
Date: Wed Jun 27 19:00:01 2012
New Revision: 1354668

URL: http://svn.apache.org/viewvc?rev=1354668&view=rev
Log:
NO-JIRA : [Proton-j] initial drain implementation

Modified:
    qpid/proton/trunk/proton-j/jproton.py
    qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java
    qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
    
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java

Modified: qpid/proton/trunk/proton-j/jproton.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/jproton.py?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/jproton.py (original)
+++ qpid/proton/trunk/proton-j/jproton.py Wed Jun 27 19:00:01 2012
@@ -382,3 +382,9 @@ def pn_queued(l):
 
 def pn_unsettled(l):
   return l.getUnsettled()
+
+def pn_drain(l, c):
+  l.drain(c)
+
+def pn_drained(l):
+  l.drained()

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java 
(original)
+++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java 
Wed Jun 27 19:00:01 2012
@@ -45,4 +45,5 @@ public interface Receiver extends Link
      */
     public int recv(byte[] bytes, int offset, int size);
 
+    public void drain(int credit);
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java 
(original)
+++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java 
Wed Jun 27 19:00:01 2012
@@ -51,4 +51,6 @@ public interface Sender extends Link
      */
     public void abort();
 
+
+    public void drained();
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
 Wed Jun 27 19:00:01 2012
@@ -45,6 +45,9 @@ class FrameParser
     private Logger _traceLogger = Logger.getLogger("proton.trace");
     private Logger _rawLogger = Logger.getLogger("proton.raw");
 
+//    private static int ID;
+//    private int _id = ID++;
+//
     static
     {
         HEADER[0] = (byte) 'A';
@@ -265,7 +268,7 @@ class FrameParser
                                 _traceLogger.log(Level.FINE, "IN: 
CH["+channel+"] : " + frameBody + (payload == null ? "" : "[" + payload + "]"));
                             }
 
-//                            System.out.println("IN: CH["+channel+"] : " + 
frameBody + (payload == null ? "" : "[" + payload + "]"));
+//                            System.out.println("IN["+_id+"]: CH["+channel+"] 
: " + frameBody + (payload == null ? "" : "[" + payload + "]"));
                             frameBody.invoke(_frameBodyHandler, 
payload,channel);
 
                         }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java 
(original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java 
Wed Jun 27 19:00:01 2012
@@ -42,6 +42,7 @@ public abstract class LinkImpl extends E
     private int _unsettled;
 
     private LinkNode<LinkImpl> _node;
+    private boolean _drain;
 
 
     public LinkImpl(SessionImpl session, String name)
@@ -272,4 +273,14 @@ public abstract class LinkImpl extends E
     {
         _unsettled--;
     }
+
+    void setDrain(boolean drain)
+    {
+        _drain = drain;
+    }
+
+    boolean getDrain()
+    {
+        return _drain;
+    }
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
 Wed Jun 27 19:00:01 2012
@@ -26,6 +26,8 @@ import org.apache.qpid.proton.engine.Seq
 
 public class ReceiverImpl extends LinkImpl implements Receiver
 {
+
+
     @Override
     public DeliveryImpl delivery(byte[] tag, int offset, int length)
     {
@@ -62,6 +64,7 @@ public class ReceiverImpl extends LinkIm
     {
         modified();
         addCredit(credits);
+        setDrain(false);
         _unsentCredits += credits;
     }
 
@@ -113,4 +116,9 @@ public class ReceiverImpl extends LinkIm
         return (delivery == current());
     }
 
+    public void drain(int credit)
+    {
+        flow(credit);
+        setDrain(true);
+    }
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
 Wed Jun 27 19:00:01 2012
@@ -28,6 +28,7 @@ public class SenderImpl  extends LinkImp
 {
     private int _offered;
     private TransportSender _transportLink;
+    private boolean _drained;
 
     public SenderImpl(SessionImpl session, String name)
     {
@@ -118,4 +119,20 @@ public class SenderImpl  extends LinkImp
             advance();
         }*/
     }
+
+    public void drained()
+    {
+        _drained = true;
+        modified();
+    }
+
+    boolean clearDrained()
+    {
+        final boolean drained = _drained;
+        if(drained)
+        {
+            _drained = false;
+        }
+        return drained;
+    }
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
 Wed Jun 27 19:00:01 2012
@@ -92,6 +92,7 @@ public class TransportImpl extends Endpo
     private final ByteBuffer _overflowBuffer = ByteBuffer.wrap(new 
byte[_maxFrameSize]);
     private static final byte AMQP_FRAME_TYPE = 0;
 
+
     {
         AMQPDefinedTypes.registerAllTypes(_decoder);
         _overflowBuffer.flip();
@@ -216,7 +217,44 @@ public class TransportImpl extends Endpo
 
     private int processSenderFlow(WritableBuffer buffer)
     {
-        return 0;  //TODO - Implement
+        EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+        int written = 0;
+        while(endpoint != null && buffer.remaining() >= _maxFrameSize)
+        {
+
+            if(endpoint instanceof SenderImpl)
+            {
+                SenderImpl sender = (SenderImpl) endpoint;
+                if(sender.getDrain() && sender.clearDrained())
+                {
+                    TransportSender transportLink = sender.getTransportLink();
+                    TransportSession transportSession = 
sender.getSession().getTransportSession();
+                    int credits = sender.getCredit();
+                    sender.setCredit(0);
+                    if(credits != 0)
+                    {
+                        
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.valueOf(credits)));
+                        transportLink.setLinkCredit(UnsignedInteger.ZERO);
+
+                        Flow flow = new Flow();
+                        flow.setHandle(transportLink.getLocalHandle());
+                        
flow.setNextIncomingId(transportSession.getNextIncomingId());
+                        
flow.setIncomingWindow(transportSession.getIncomingWindowSize());
+                        
flow.setOutgoingWindow(transportSession.getOutgoingWindowSize());
+                        
flow.setDeliveryCount(transportLink.getDeliveryCount());
+                        flow.setLinkCredit(transportLink.getLinkCredit());
+                        flow.setDrain(sender.getDrain());
+                        
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
+                        int frameBytes = writeFrame(buffer, 
transportSession.getLocalChannel(), flow, null);
+                        written += frameBytes;
+                    }
+                }
+
+            }
+
+            endpoint = endpoint.getNext();
+        }
+        return written;  //TODO - Implement
     }
 
     private int processSenderDisposition(WritableBuffer buffer)
@@ -398,6 +436,7 @@ public class TransportImpl extends Endpo
                         
flow.setOutgoingWindow(transportSession.getOutgoingWindowSize());
                         
flow.setDeliveryCount(transportLink.getDeliveryCount());
                         flow.setLinkCredit(transportLink.getLinkCredit());
+                        flow.setDrain(receiver.getDrain());
                         
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
                         int frameBytes = writeFrame(buffer, 
transportSession.getLocalChannel(), flow, null);
                         written += frameBytes;

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
 Wed Jun 27 19:00:01 2012
@@ -113,6 +113,7 @@ class TransportLink<T extends LinkImpl>
         _remoteDeliveryCount = flow.getDeliveryCount();
         _remoteLinkCredit = flow.getLinkCredit();
 
+
     }
 
     void setLinkCredit(UnsignedInteger linkCredit)
@@ -129,4 +130,20 @@ class TransportLink<T extends LinkImpl>
     {
         
getLink().getSession().getTransportSession().settled(transportDelivery);
     }
+
+
+    UnsignedInteger getRemoteDeliveryCount()
+    {
+        return _remoteDeliveryCount;
+    }
+
+    UnsignedInteger getRemoteLinkCredit()
+    {
+        return _remoteLinkCredit;
+    }
+
+    public void setRemoteLinkCredit(UnsignedInteger remoteLinkCredit)
+    {
+        _remoteLinkCredit = remoteLinkCredit;
+    }
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
 Wed Jun 27 19:00:01 2012
@@ -42,5 +42,13 @@ class TransportReceiver extends Transpor
     void handleFlow(Flow flow)
     {
         super.handleFlow(flow);
+        if(getRemoteDeliveryCount().compareTo(getDeliveryCount())>=0)
+        {
+            getLink().setCredit(getRemoteLinkCredit().intValue());
+            setLinkCredit(getRemoteLinkCredit());
+            setDeliveryCount(getRemoteDeliveryCount());
+        }
+
+
     }
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
 Wed Jun 27 19:00:01 2012
@@ -39,6 +39,7 @@ class TransportSender extends TransportL
     {
         super.handleFlow(flow);
         _drain = flow.getDrain();
+        getLink().setDrain(flow.getDrain());
         UnsignedInteger transferLimit = 
flow.getLinkCredit().add(getDeliveryCount());
         UnsignedInteger linkCredit = 
transferLimit.subtract(getDeliveryCount());
         getLink().setCredit(linkCredit.intValue());
@@ -48,5 +49,4 @@ class TransportSender extends TransportL
         setLinkCredit(linkCredit);
     }
 
-
 }

Modified: 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java
 Wed Jun 27 19:00:01 2012
@@ -44,12 +44,12 @@ public class Attach
 {
     private static final Object[] DESCRIPTORS =
     {
-        UnsignedLong.valueOf(0x0000000000000012L), 
Symbol.valueOf("amqp:attach:list"), 
+        UnsignedLong.valueOf(0x0000000000000012L), 
Symbol.valueOf("amqp:attach:list"),
     };
 
     private static final UnsignedLong DESCRIPTOR = 
UnsignedLong.valueOf(0x0000000000000012L);
     private final AttachWrapper _wrapper = new AttachWrapper();
-    
+
     private String _name;
     private UnsignedInteger _handle;
     private boolean _role;
@@ -214,7 +214,7 @@ public class Attach
     {
         _properties = properties;
     }
-    
+
     public Object getDescriptor()
     {
         return DESCRIPTOR;
@@ -224,7 +224,7 @@ public class Attach
     {
         return _wrapper;
     }
-    
+
     public Object get(final int index)
     {
 
@@ -257,7 +257,7 @@ public class Attach
             case 12:
                 return _desiredCapabilities;
             case 13:
-                return _properties;            
+                return _properties;
         }
 
         throw new IllegalStateException("Unknown index " + index);
@@ -266,29 +266,29 @@ public class Attach
 
     public int size()
     {
-        return _properties != null 
-                  ? 14 
-                  : _desiredCapabilities != null 
-                  ? 13 
-                  : _offeredCapabilities != null 
-                  ? 12 
-                  : _maxMessageSize != null 
-                  ? 11 
-                  : _initialDeliveryCount != null 
-                  ? 10 
+        return _properties != null
+                  ? 14
+                  : _desiredCapabilities != null
+                  ? 13
+                  : _offeredCapabilities != null
+                  ? 12
+                  : _maxMessageSize != null
+                  ? 11
+                  : _initialDeliveryCount != null
+                  ? 10
                   : (_incompleteUnsettled != false)
-                  ? 9 
-                  : _unsettled != null 
-                  ? 8 
-                  : _target != null 
-                  ? 7 
-                  : _source != null 
-                  ? 6 
+                  ? 9
+                  : _unsettled != null
+                  ? 8
+                  : _target != null
+                  ? 7
+                  : _source != null
+                  ? 6
                   : (_rcvSettleMode != null && 
!_rcvSettleMode.equals(ReceiverSettleMode.FIRST))
-                  ? 5 
+                  ? 5
                   : (_sndSettleMode != null && 
!_sndSettleMode.equals(SenderSettleMode.MIXED))
-                  ? 4 
-                  : 3;        
+                  ? 4
+                  : 3;
 
     }
 
@@ -405,7 +405,7 @@ public class Attach
         return "Attach{" +
                "name='" + _name + '\'' +
                ", handle=" + _handle +
-               ", role=" + _role +
+               ", role=" + (_role ? "RECEIVER" : "SENDER") +
                ", sndSettleMode=" + _sndSettleMode +
                ", rcvSettleMode=" + _rcvSettleMode +
                ", source=" + _source +
@@ -420,4 +420,3 @@ public class Attach
                '}';
     }
 }
-  
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to