Author: rgodfrey
Date: Fri Oct 5 12:17:39 2012
New Revision: 1394477
URL: http://svn.apache.org/viewvc?rev=1394477&view=rev
Log:
PROTON-59 : max frame size not respected, large message support in proton-j
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/codec/CompositeWritableBuffer.java
Fri Oct 5 12:17:39 2012
@@ -153,7 +153,7 @@ public class CompositeWritableBuffer imp
else
{
int relativePosition = currentPosition-position;
- if(relativePosition >= _second.position())
+ if(relativePosition <= _second.position())
{
_second.position(_second.position()-relativePosition);
}
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
Fri Oct 5 12:17:39 2012
@@ -55,6 +55,7 @@ public class DeliveryImpl implements Del
private boolean _complete;
private boolean _updated;
private boolean _done;
+ private int _offset;
public DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl
previous)
{
@@ -168,7 +169,8 @@ public class DeliveryImpl implements Del
//TODO - should only be if no bytes left
consumed = Math.min(size, _dataSize);
- System.arraycopy(_data, 0, bytes, offset, consumed);
+ System.arraycopy(_data, _offset, bytes, offset, consumed);
+ _offset += consumed;
_dataSize -= consumed;
}
else
@@ -298,10 +300,6 @@ public class DeliveryImpl implements Del
void setTransportWorkNext(DeliveryImpl transportWorkNext)
{
- if(transportWorkNext == this)
- {
- (new Exception("Aaaargh")).printStackTrace();
- }
_transportWorkNext = transportWorkNext;
}
@@ -340,11 +338,11 @@ public class DeliveryImpl implements Del
{
byte[] oldData = _data;
_data = new byte[oldData.length + _dataSize];
- System.arraycopy(oldData,0,_data,0,_dataSize);
+ System.arraycopy(oldData,_offset,_data,0,_dataSize);
+ _offset = 0;
}
- System.arraycopy(bytes,offset,_data,_dataSize,length);
+ System.arraycopy(bytes,offset,_data,_dataSize+_offset,length);
_dataSize+=length;
-// addToWorkList();
addToTransportWorkList();
return length; //TODO - Implement.
}
@@ -356,7 +354,7 @@ public class DeliveryImpl implements Del
int getDataOffset()
{
- return 0; //TODO - Implement.
+ return _offset;
}
int getDataLength()
@@ -376,7 +374,7 @@ public class DeliveryImpl implements Del
public void setDataOffset(int arrayOffset)
{
- // TODO - implement
+ _offset = arrayOffset;
}
public boolean isWritable()
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Fri Oct 5 12:17:39 2012
@@ -17,18 +17,21 @@
package org.apache.qpid.proton.engine.impl;
+import java.util.EnumSet;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.engine.Accepted;
import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.DeliveryState;
import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.ProtonException;
import org.apache.qpid.proton.engine.FrameTransport;
import org.apache.qpid.proton.engine.SaslClient;
import org.apache.qpid.proton.engine.SaslServer;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportInput;
@@ -274,7 +277,7 @@ public class TransportImpl extends Endpo
detach.setHandle(localHandle);
- int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), detach, null);
+ int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), detach, null, null);
written += frameBytes;
endpoint.clearModified();
}
@@ -317,7 +320,7 @@ public class TransportImpl extends Endpo
flow.setLinkCredit(transportLink.getLinkCredit());
flow.setDrain(sender.getDrain());
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
- int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null);
+ int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null, null);
written += frameBytes;
endpoint.clearModified();
}
@@ -361,7 +364,7 @@ public class TransportImpl extends Endpo
}
int frameBytes = writeFrame(buffer,
delivery.getLink().getSession()
.getTransportSession().getLocalChannel(),
- disposition, null);
+ disposition, null, null);
written += frameBytes;
delivery = delivery.clearTransportWork();
}
@@ -399,10 +402,11 @@ public class TransportImpl extends Endpo
delivery.setTransportDelivery(transportDelivery);
sender.getSession().getTransportSession().addUnsettledOutgoing(deliveryId,
delivery);
- Transfer transfer = new Transfer();
+ final Transfer transfer = new Transfer();
transfer.setDeliveryId(deliveryId);
transfer.setDeliveryTag(new Binary(delivery.getTag()));
transfer.setHandle(transportLink.getLocalHandle());
+
if(delivery.isSettled())
{
transfer.setSettled(Boolean.TRUE);
@@ -411,6 +415,7 @@ public class TransportImpl extends Endpo
{
transfer.setMore(true);
}
+
transfer.setMessageFormat(UnsignedInteger.ZERO);
// TODO - large frames
@@ -418,23 +423,34 @@ public class TransportImpl extends Endpo
int frameBytes = writeFrame(buffer,
sender.getSession().getTransportSession().getLocalChannel(),
- transfer, payload);
+ transfer, payload, new
PartialTransfer(transfer));
sender.getSession().getTransportSession().incrementOutgoingId();
written += frameBytes;
- // TODO partial consumption
- delivery.setData(null);
- delivery.setDataLength(0);
- delivery.setDone();
+ if(payload == null || !payload.hasRemaining())
+ {
+ delivery.setData(null);
+ delivery.setDataLength(0);
+ delivery.setDone();
+
+ if(delivery.getLink().current() != delivery)
+ {
+
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.ONE));
+
transportLink.setLinkCredit(transportLink.getLinkCredit().subtract(UnsignedInteger.ONE));
+ }
- if(delivery.getLink().current() != delivery)
+ delivery = delivery.clearTransportWork();
+
+ }
+ else
{
-
transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(UnsignedInteger.ONE));
-
transportLink.setLinkCredit(transportLink.getLinkCredit().subtract(UnsignedInteger.ONE));
+
delivery.setDataOffset(delivery.getDataOffset()+delivery.getDataLength()-payload.remaining());
+ delivery.setDataLength(payload.remaining());
}
- delivery = delivery.clearTransportWork();
+
+
}
@@ -476,7 +492,7 @@ public class TransportImpl extends Endpo
}
int frameBytes = writeFrame(buffer,
delivery.getLink().getSession()
.getTransportSession().getLocalChannel(),
- disposition, null);
+ disposition, null, null);
written += frameBytes;
if(delivery.isSettled())
{
@@ -526,7 +542,7 @@ public class TransportImpl extends Endpo
flow.setLinkCredit(transportLink.getLinkCredit());
flow.setDrain(receiver.getDrain());
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
- int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null);
+ int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null, null);
written += frameBytes;
if(receiver.getLocalState() ==
EndpointState.ACTIVE)
{
@@ -558,7 +574,7 @@ public class TransportImpl extends Endpo
flow.setOutgoingWindow(transportSession.getOutgoingWindowSize());
flow.setNextOutgoingId(transportSession.getNextOutgoingId());
flow.setNextIncomingId(transportSession.getNextIncomingId());
- int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null);
+ int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), flow, null, null);
written += frameBytes;
}
}
@@ -625,7 +641,7 @@ public class TransportImpl extends Endpo
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
}
- int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), attach, null);
+ int frameBytes = writeFrame(buffer,
transportSession.getLocalChannel(), attach, null, null);
written += frameBytes;
transportLink.sentAttach();
if(link.getLocalState() == EndpointState.ACTIVE &&
(link instanceof SenderImpl || !link.hasCredit()))
@@ -678,7 +694,7 @@ public class TransportImpl extends Endpo
_isOpenSent = true;
- return writeFrame(buffer, 0, open, null);
+ return writeFrame(buffer, 0, open, null, null);
}
return 0;
@@ -715,7 +731,7 @@ public class TransportImpl extends Endpo
begin.setOutgoingWindow(transportSession.getOutgoingWindowSize());
begin.setNextOutgoingId(transportSession.getNextOutgoingId());
- written += writeFrame(buffer, channelId, begin, null);
+ written += writeFrame(buffer, channelId, begin, null,
null);
transportSession.sentBegin();
if(session.getLocalState() == EndpointState.ACTIVE)
{
@@ -780,7 +796,7 @@ public class TransportImpl extends Endpo
End end = new End();
- int frameBytes = writeFrame(buffer, channel, end, null);
+ int frameBytes = writeFrame(buffer, channel, end, null,
null);
written += frameBytes;
endpoint.clearModified();
@@ -829,21 +845,39 @@ public class TransportImpl extends Endpo
_isCloseSent = true;
- return writeFrame(buffer, 0, close, null);
+ return writeFrame(buffer, 0, close, null, null);
}
}
return 0;
}
- private int writeFrame(WritableBuffer buffer, int channel, DescribedType
frameBody, ByteBuffer payload)
+ private int writeFrame(WritableBuffer buffer,
+ int channel,
+ DescribedType frameBody,
+ ByteBuffer payload,
+ Runnable onPayloadTooLarge)
{
int oldPosition = buffer.position();
buffer.position(buffer.position()+8);
_encoder.setByteBuffer(buffer);
- _encoder.writeDescribedType(frameBody);
- int payloadSize = Math.min(payload == null ? 0 : payload.remaining(),
_maxFrameSize);
+ if(payload == null || payload.remaining() < _maxFrameSize)
+ {
+ _encoder.writeDescribedType(frameBody);
+ }
+
+ if(payload != null && (payload.remaining() + buffer.position() -
oldPosition) > _maxFrameSize)
+ {
+ if(onPayloadTooLarge != null)
+ {
+ onPayloadTooLarge.run();
+ }
+ buffer.position(oldPosition+8);
+ _encoder.writeDescribedType(frameBody);
+ }
+
+ int payloadSize = Math.min(payload == null ? 0 : payload.remaining(),
_maxFrameSize - (buffer.position() - oldPosition));
if(payloadSize > 0)
{
int oldLimit = payload.limit();
@@ -1096,4 +1130,20 @@ public class TransportImpl extends Endpo
}
}
-}
+ private static class PartialTransfer implements Runnable
+ {
+ private final Transfer _transfer;
+
+ public PartialTransfer(Transfer transfer)
+ {
+ _transfer = transfer;
+ }
+
+ @Override
+ public void run()
+ {
+ _transfer.setMore(true);
+ }
+ }
+
+ }
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java?rev=1394477&r1=1394476&r2=1394477&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
Fri Oct 5 12:17:39 2012
@@ -235,9 +235,21 @@ class TransportSession
// TODO - should this be a copy?
if(payload != null)
{
- delivery.setData(payload.getArray());
- delivery.setDataLength(payload.getLength());
- delivery.setDataOffset(payload.getArrayOffset());
+ if(delivery.getDataLength() == 0)
+ {
+ delivery.setData(payload.getArray());
+ delivery.setDataLength(payload.getLength());
+ delivery.setDataOffset(payload.getArrayOffset());
+ }
+ else
+ {
+ byte[] data = new byte[delivery.getDataLength() +
payload.getLength()];
+ System.arraycopy(delivery.getData(), delivery.getDataOffset(),
data, 0, delivery.getDataLength());
+ System.arraycopy(payload.getArray(), payload.getArrayOffset(),
data, delivery.getDataLength(), payload.getLength());
+ delivery.setData(data);
+ delivery.setDataOffset(0);
+ delivery.setDataLength(data.length);
+ }
}
delivery.addIOWork();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]