Author: rhs
Date: Thu Jul 17 19:52:42 2014
New Revision: 1611458

URL: http://svn.apache.org/r1611458
Log:
generate flow events when queued deliveries are sent; limit the size of the 
outbound frame buffer

Modified:
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java?rev=1611458&r1=1611457&r2=1611458&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
 Thu Jul 17 19:52:42 2014
@@ -192,6 +192,11 @@ class FrameWriter
         writeFrame(0, frameBody, null, null);
     }
 
+    boolean isFull() {
+        // XXX: this should probably be tunable
+        return _bbuf.position() > 64*1024;
+    }
+
     int readBytes(ByteBuffer dst)
     {
         ByteBuffer src = _bbuf.duplicate();

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611458&r1=1611457&r2=1611458&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 Thu Jul 17 19:52:42 2014
@@ -492,7 +492,7 @@ public class TransportImpl extends Endpo
         if(!delivery.isDone() &&
            (delivery.getDataLength() > 0 || delivery != snd.current()) &&
            tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
-           tpLink.getLocalHandle() != null)
+           tpLink.getLocalHandle() != null && !_frameWriter.isFull())
         {
             UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
             TransportDelivery tpDelivery = new TransportDelivery(deliveryId, 
delivery, tpLink);
@@ -551,6 +551,8 @@ public class TransportImpl extends Endpo
                 delivery.setDataLength(payload.remaining());
                 session.incrementOutgoingBytes(-delta);
             }
+
+            getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
         }
 
         if(wasDone && delivery.getLocalState() != null)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to