Author: rhs
Date: Thu Jul 17 19:52:42 2014
New Revision: 1611458
URL: http://svn.apache.org/r1611458
Log:
generate flow events when queued deliveries are sent; limit the size of the
outbound frame buffer
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java?rev=1611458&r1=1611457&r2=1611458&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
Thu Jul 17 19:52:42 2014
@@ -192,6 +192,11 @@ class FrameWriter
writeFrame(0, frameBody, null, null);
}
+ boolean isFull() {
+ // XXX: this should probably be tunable
+ return _bbuf.position() > 64*1024;
+ }
+
int readBytes(ByteBuffer dst)
{
ByteBuffer src = _bbuf.duplicate();
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1611458&r1=1611457&r2=1611458&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Thu Jul 17 19:52:42 2014
@@ -492,7 +492,7 @@ public class TransportImpl extends Endpo
if(!delivery.isDone() &&
(delivery.getDataLength() > 0 || delivery != snd.current()) &&
tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
- tpLink.getLocalHandle() != null)
+ tpLink.getLocalHandle() != null && !_frameWriter.isFull())
{
UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
TransportDelivery tpDelivery = new TransportDelivery(deliveryId,
delivery, tpLink);
@@ -551,6 +551,8 @@ public class TransportImpl extends Endpo
delivery.setDataLength(payload.remaining());
session.incrementOutgoingBytes(-delta);
}
+
+ getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
}
if(wasDone && delivery.getLocalState() != null)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]