Author: rgodfrey
Date: Wed Jun 27 19:00:01 2012
New Revision: 1354668
URL: http://svn.apache.org/viewvc?rev=1354668&view=rev
Log:
NO-JIRA : [Proton-j] initial drain implementation
Modified:
qpid/proton/trunk/proton-j/jproton.py
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java
Modified: qpid/proton/trunk/proton-j/jproton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/jproton.py?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/jproton.py (original)
+++ qpid/proton/trunk/proton-j/jproton.py Wed Jun 27 19:00:01 2012
@@ -382,3 +382,9 @@ def pn_queued(l):
def pn_unsettled(l):
return l.getUnsettled()
+
+def pn_drain(l, c):
+ l.drain(c)
+
+def pn_drained(l):
+ l.drained()
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java
(original)
+++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Receiver.java
Wed Jun 27 19:00:01 2012
@@ -45,4 +45,5 @@ public interface Receiver extends Link
*/
public int recv(byte[] bytes, int offset, int size);
+ public void drain(int credit);
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java
(original)
+++ qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/Sender.java
Wed Jun 27 19:00:01 2012
@@ -51,4 +51,6 @@ public interface Sender extends Link
*/
public void abort();
+
+ public void drained();
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/FrameParser.java
Wed Jun 27 19:00:01 2012
@@ -45,6 +45,9 @@ class FrameParser
private Logger _traceLogger = Logger.getLogger("proton.trace");
private Logger _rawLogger = Logger.getLogger("proton.raw");
+// private static int ID;
+// private int _id = ID++;
+//
static
{
HEADER[0] = (byte) 'A';
@@ -265,7 +268,7 @@ class FrameParser
_traceLogger.log(Level.FINE, "IN:
CH["+channel+"] : " + frameBody + (payload == null ? "" : "[" + payload + "]"));
}
-// System.out.println("IN: CH["+channel+"] : " +
frameBody + (payload == null ? "" : "[" + payload + "]"));
+// System.out.println("IN["+_id+"]: CH["+channel+"]
: " + frameBody + (payload == null ? "" : "[" + payload + "]"));
frameBody.invoke(_frameBodyHandler,
payload,channel);
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/LinkImpl.java
Wed Jun 27 19:00:01 2012
@@ -42,6 +42,7 @@ public abstract class LinkImpl extends E
private int _unsettled;
private LinkNode<LinkImpl> _node;
+ private boolean _drain;
public LinkImpl(SessionImpl session, String name)
@@ -272,4 +273,14 @@ public abstract class LinkImpl extends E
{
_unsettled--;
}
+
+ void setDrain(boolean drain)
+ {
+ _drain = drain;
+ }
+
+ boolean getDrain()
+ {
+ return _drain;
+ }
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
Wed Jun 27 19:00:01 2012
@@ -26,6 +26,8 @@ import org.apache.qpid.proton.engine.Seq
public class ReceiverImpl extends LinkImpl implements Receiver
{
+
+
@Override
public DeliveryImpl delivery(byte[] tag, int offset, int length)
{
@@ -62,6 +64,7 @@ public class ReceiverImpl extends LinkIm
{
modified();
addCredit(credits);
+ setDrain(false);
_unsentCredits += credits;
}
@@ -113,4 +116,9 @@ public class ReceiverImpl extends LinkIm
return (delivery == current());
}
+ public void drain(int credit)
+ {
+ flow(credit);
+ setDrain(true);
+ }
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/SenderImpl.java
Wed Jun 27 19:00:01 2012
@@ -28,6 +28,7 @@ public class SenderImpl extends LinkImp
{
private int _offered;
private TransportSender _transportLink;
+ private boolean _drained;
public SenderImpl(SessionImpl session, String name)
{
@@ -118,4 +119,20 @@ public class SenderImpl extends LinkImp
advance();
}*/
}
+
+ public void drained()
+ {
+ _drained = true;
+ modified();
+ }
+
+ boolean clearDrained()
+ {
+ final boolean drained = _drained;
+ if(drained)
+ {
+ _drained = false;
+ }
+ return drained;
+ }
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportImpl.java
Wed Jun 27 19:00:01 2012
@@ -92,6 +92,7 @@ public class TransportImpl extends Endpo
private final ByteBuffer _overflowBuffer = ByteBuffer.wrap(new
byte[_maxFrameSize]);
private static final byte AMQP_FRAME_TYPE = 0;
+
{
AMQPDefinedTypes.registerAllTypes(_decoder);
_overflowBuffer.flip();
@@ -216,7 +217,44 @@ public class TransportImpl extends Endpo
private int processSenderFlow(WritableBuffer buffer)
{
- return 0; //TODO - Implement
+ EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
+ int written = 0;
+ while(endpoint != null && buffer.remaining() >= _maxFrameSize)
+ {
+
+ if(endpoint instanceof SenderImpl)
+ {
+ SenderImpl sender = (SenderImpl) endpoint;
+ if(sender.getDrain() && sender.clearDrained())
+ {
+ TransportSender transportLink = sender.getTransportLink();
+ TransportSession transportSession =
sender.getSession().getTransportSession();
+ int credits = sender.getCredit();
+ sender.setCredit(0);
+ if(credits != 0)
+ {
+
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.valueOf(credits)));
+ transportLink.setLinkCredit(UnsignedInteger.ZERO);
+
+ Flow flow = new Flow();
+ flow.setHandle(transportLink.getLocalHandle());
+
flow.setNextIncomingId(transportSession.getNextIncomingId());
+
flow.setIncomingWindow(transportSession.getIncomingWindowSize());
+
flow.setOutgoingWindow(transportSession.getOutgoingWindowSize());
+
flow.setDeliveryCount(transportLink.getDeliveryCount());
+ flow.setLinkCredit(transportLink.getLinkCredit());
+ flow.setDrain(sender.getDrain());
+
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
+ int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null);
+ written += frameBytes;
+ }
+ }
+
+ }
+
+ endpoint = endpoint.getNext();
+ }
+ return written; //TODO - Implement
}
private int processSenderDisposition(WritableBuffer buffer)
@@ -398,6 +436,7 @@ public class TransportImpl extends Endpo
flow.setOutgoingWindow(transportSession.getOutgoingWindowSize());
flow.setDeliveryCount(transportLink.getDeliveryCount());
flow.setLinkCredit(transportLink.getLinkCredit());
+ flow.setDrain(receiver.getDrain());
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null);
written += frameBytes;
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportLink.java
Wed Jun 27 19:00:01 2012
@@ -113,6 +113,7 @@ class TransportLink<T extends LinkImpl>
_remoteDeliveryCount = flow.getDeliveryCount();
_remoteLinkCredit = flow.getLinkCredit();
+
}
void setLinkCredit(UnsignedInteger linkCredit)
@@ -129,4 +130,20 @@ class TransportLink<T extends LinkImpl>
{
getLink().getSession().getTransportSession().settled(transportDelivery);
}
+
+
+ UnsignedInteger getRemoteDeliveryCount()
+ {
+ return _remoteDeliveryCount;
+ }
+
+ UnsignedInteger getRemoteLinkCredit()
+ {
+ return _remoteLinkCredit;
+ }
+
+ public void setRemoteLinkCredit(UnsignedInteger remoteLinkCredit)
+ {
+ _remoteLinkCredit = remoteLinkCredit;
+ }
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportReceiver.java
Wed Jun 27 19:00:01 2012
@@ -42,5 +42,13 @@ class TransportReceiver extends Transpor
void handleFlow(Flow flow)
{
super.handleFlow(flow);
+ if(getRemoteDeliveryCount().compareTo(getDeliveryCount())>=0)
+ {
+ getLink().setCredit(getRemoteLinkCredit().intValue());
+ setLinkCredit(getRemoteLinkCredit());
+ setDeliveryCount(getRemoteDeliveryCount());
+ }
+
+
}
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/engine/impl/TransportSender.java
Wed Jun 27 19:00:01 2012
@@ -39,6 +39,7 @@ class TransportSender extends TransportL
{
super.handleFlow(flow);
_drain = flow.getDrain();
+ getLink().setDrain(flow.getDrain());
UnsignedInteger transferLimit =
flow.getLinkCredit().add(getDeliveryCount());
UnsignedInteger linkCredit =
transferLimit.subtract(getDeliveryCount());
getLink().setCredit(linkCredit.intValue());
@@ -48,5 +49,4 @@ class TransportSender extends TransportL
setLinkCredit(linkCredit);
}
-
}
Modified:
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java?rev=1354668&r1=1354667&r2=1354668&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java
(original)
+++
qpid/proton/trunk/proton-j/src/org/apache/qpid/proton/type/transport/Attach.java
Wed Jun 27 19:00:01 2012
@@ -44,12 +44,12 @@ public class Attach
{
private static final Object[] DESCRIPTORS =
{
- UnsignedLong.valueOf(0x0000000000000012L),
Symbol.valueOf("amqp:attach:list"),
+ UnsignedLong.valueOf(0x0000000000000012L),
Symbol.valueOf("amqp:attach:list"),
};
private static final UnsignedLong DESCRIPTOR =
UnsignedLong.valueOf(0x0000000000000012L);
private final AttachWrapper _wrapper = new AttachWrapper();
-
+
private String _name;
private UnsignedInteger _handle;
private boolean _role;
@@ -214,7 +214,7 @@ public class Attach
{
_properties = properties;
}
-
+
public Object getDescriptor()
{
return DESCRIPTOR;
@@ -224,7 +224,7 @@ public class Attach
{
return _wrapper;
}
-
+
public Object get(final int index)
{
@@ -257,7 +257,7 @@ public class Attach
case 12:
return _desiredCapabilities;
case 13:
- return _properties;
+ return _properties;
}
throw new IllegalStateException("Unknown index " + index);
@@ -266,29 +266,29 @@ public class Attach
public int size()
{
- return _properties != null
- ? 14
- : _desiredCapabilities != null
- ? 13
- : _offeredCapabilities != null
- ? 12
- : _maxMessageSize != null
- ? 11
- : _initialDeliveryCount != null
- ? 10
+ return _properties != null
+ ? 14
+ : _desiredCapabilities != null
+ ? 13
+ : _offeredCapabilities != null
+ ? 12
+ : _maxMessageSize != null
+ ? 11
+ : _initialDeliveryCount != null
+ ? 10
: (_incompleteUnsettled != false)
- ? 9
- : _unsettled != null
- ? 8
- : _target != null
- ? 7
- : _source != null
- ? 6
+ ? 9
+ : _unsettled != null
+ ? 8
+ : _target != null
+ ? 7
+ : _source != null
+ ? 6
: (_rcvSettleMode != null &&
!_rcvSettleMode.equals(ReceiverSettleMode.FIRST))
- ? 5
+ ? 5
: (_sndSettleMode != null &&
!_sndSettleMode.equals(SenderSettleMode.MIXED))
- ? 4
- : 3;
+ ? 4
+ : 3;
}
@@ -405,7 +405,7 @@ public class Attach
return "Attach{" +
"name='" + _name + '\'' +
", handle=" + _handle +
- ", role=" + _role +
+ ", role=" + (_role ? "RECEIVER" : "SENDER") +
", sndSettleMode=" + _sndSettleMode +
", rcvSettleMode=" + _rcvSettleMode +
", source=" + _source +
@@ -420,4 +420,3 @@ public class Attach
'}';
}
}
-
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]