Author: robbie
Date: Fri Jun 27 10:07:47 2014
New Revision: 1606010
URL: http://svn.apache.org/r1606010
Log:
PROTON-597: update TransportOutputAdaptor and FrameParser to release buffers
after use, reducing memory consumption when using large frame sizes
Change from Marcel Meulemans
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1606010&r1=1606009&r2=1606010&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
Fri Jun 27 10:07:47 2014
@@ -40,6 +40,8 @@ class FrameParser implements TransportIn
{
private static final Logger TRACE_LOGGER =
Logger.getLogger("proton.trace");
+ private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
+
private enum State
{
HEADER0,
@@ -62,8 +64,9 @@ class FrameParser implements TransportIn
private final FrameHandler _frameHandler;
private final ByteBufferDecoder _decoder;
+ private final int _maxFrameSize;
- private final ByteBuffer _inputBuffer;
+ private ByteBuffer _inputBuffer = null;
private boolean _tail_closed = false;
private State _state = State.HEADER0;
@@ -87,11 +90,7 @@ class FrameParser implements TransportIn
{
_frameHandler = frameHandler;
_decoder = decoder;
- if (maxFrameSize > 0) {
- _inputBuffer = newWriteableBuffer(maxFrameSize);
- } else {
- _inputBuffer = newWriteableBuffer(4*1024);
- }
+ _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
}
private void input(ByteBuffer in) throws TransportException
@@ -372,6 +371,7 @@ class FrameParser implements TransportIn
_decoder.setByteBuffer(in);
Object val = _decoder.readObject();
+ _decoder.setByteBuffer(null);
Binary payload;
@@ -475,7 +475,11 @@ class FrameParser implements TransportIn
if (_tail_closed) {
return Transport.END_OF_STREAM;
} else {
- return _inputBuffer.remaining();
+ if (_inputBuffer != null) {
+ return _inputBuffer.remaining();
+ } else {
+ return _maxFrameSize;
+ }
}
}
@@ -489,21 +493,37 @@ class FrameParser implements TransportIn
throw new TransportException("tail closed");
}
}
+
+ if (_inputBuffer == null) {
+ _inputBuffer = newWriteableBuffer(_maxFrameSize);
+ }
+
return _inputBuffer;
}
@Override
public void process() throws TransportException
{
- _inputBuffer.flip();
-
- try
+ if (_inputBuffer != null)
{
- input(_inputBuffer);
+ _inputBuffer.flip();
+
+ try
+ {
+ input(_inputBuffer);
+ }
+ finally
+ {
+ if (_inputBuffer.hasRemaining()) {
+ _inputBuffer.compact();
+ } else {
+ _inputBuffer = null;
+ }
+ }
}
- finally
+ else
{
- _inputBuffer.compact();
+ input(_emptyInputBuffer);
}
}
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1606010&r1=1606009&r2=1606010&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
Fri Jun 27 10:07:47 2014
@@ -101,8 +101,6 @@ public class TransportImpl extends Endpo
private SslImpl _ssl;
private final Ref<ProtocolTracer> _protocolTracer = new Ref(null);
- private ByteBuffer _lastInputBuffer;
-
private TransportResult _lastTransportResult = TransportResultFactory.ok();
private boolean _init;
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java?rev=1606010&r1=1606009&r2=1606010&view=diff
==============================================================================
---
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
(original)
+++
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
Fri Jun 27 10:07:47 2014
@@ -26,23 +26,22 @@ import org.apache.qpid.proton.engine.Tra
class TransportOutputAdaptor implements TransportOutput
{
- private TransportOutputWriter _transportOutputWriter;
+ private static final ByteBuffer _emptyHead =
newReadableBuffer(0).asReadOnlyBuffer();
- private final ByteBuffer _outputBuffer;
- private final ByteBuffer _head;
+ private final TransportOutputWriter _transportOutputWriter;
+ private final int _maxFrameSize;
+ private final ByteBuffer _scratchBuffer;
+
+ private ByteBuffer _outputBuffer = null;
+ private ByteBuffer _head = null;
private boolean _output_done = false;
private boolean _head_closed = false;
TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int
maxFrameSize)
{
_transportOutputWriter = transportOutputWriter;
- if (maxFrameSize > 0) {
- _outputBuffer = newWriteableBuffer(maxFrameSize);
- } else {
- _outputBuffer = newWriteableBuffer(4*1024);
- }
- _head = _outputBuffer.asReadOnlyBuffer();
- _head.limit(0);
+ _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
+ _scratchBuffer = newWriteableBuffer(Math.min(512, _maxFrameSize));
}
@Override
@@ -52,11 +51,18 @@ class TransportOutputAdaptor implements
return Transport.END_OF_STREAM;
}
- _output_done = _transportOutputWriter.writeInto(_outputBuffer);
- _head.limit(_outputBuffer.position());
+ try_fill_buffer();
- if (_output_done && _outputBuffer.position() == 0) {
- return Transport.END_OF_STREAM;
+ if (_outputBuffer != null && _outputBuffer.position() == 0) {
+ release_buffers();
+ }
+
+ if (_outputBuffer == null) {
+ if (_output_done) {
+ return Transport.END_OF_STREAM;
+ } else {
+ return 0;
+ }
} else {
return _outputBuffer.position();
}
@@ -66,17 +72,22 @@ class TransportOutputAdaptor implements
public ByteBuffer head()
{
pending();
- return _head;
+ return _head != null ? _head : _emptyHead;
}
@Override
public void pop(int bytes)
{
- _outputBuffer.flip();
- _outputBuffer.position(bytes);
- _outputBuffer.compact();
- _head.position(0);
- _head.limit(_outputBuffer.position());
+ if (_outputBuffer != null) {
+ _outputBuffer.flip();
+ _outputBuffer.position(bytes);
+ _outputBuffer.compact();
+ _head.position(0);
+ _head.limit(_outputBuffer.position());
+ if (_outputBuffer.position() == 0) {
+ release_buffers();
+ }
+ }
}
@Override
@@ -84,6 +95,45 @@ class TransportOutputAdaptor implements
{
_head_closed = true;
_transportOutputWriter.closed();
+ release_buffers();
+ }
+
+ private void init_buffers() {
+ _outputBuffer = newWriteableBuffer(_maxFrameSize);
+ _head = _outputBuffer.asReadOnlyBuffer();
+ _head.limit(0);
+ }
+
+ private void release_buffers() {
+ _head = null;
+ _outputBuffer = null;
+ }
+
+ private void try_fill_buffer() {
+ boolean done = false;
+ while (!done) {
+ reset_scratch_buffer();
+ _output_done |= _transportOutputWriter.writeInto(_scratchBuffer);
+ done = _scratchBuffer.position() < _scratchBuffer.capacity();
+ if (_scratchBuffer.position() > 0) {
+ copy_scratch_to_output();
+ }
+ }
}
+ private void reset_scratch_buffer() {
+ _scratchBuffer.clear();
+ if (_outputBuffer != null) {
+ _scratchBuffer.limit(Math.min(_scratchBuffer.capacity(),
_outputBuffer.capacity() - _outputBuffer.position()));
+ }
+ }
+
+ private void copy_scratch_to_output() {
+ if (_outputBuffer == null) {
+ init_buffers();
+ }
+ _scratchBuffer.flip();
+ _outputBuffer.put(_scratchBuffer);
+ _head.limit(_outputBuffer.position());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]