Author: robbie
Date: Fri Jun 27 10:07:47 2014
New Revision: 1606010

URL: http://svn.apache.org/r1606010
Log:
PROTON-597: update TransportOutputAdaptor and FrameParser to release buffers 
after use, reducing memory consumption when using large frame sizes

Change from Marcel Meulemans

Modified:
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
    
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java?rev=1606010&r1=1606009&r2=1606010&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
 Fri Jun 27 10:07:47 2014
@@ -40,6 +40,8 @@ class FrameParser implements TransportIn
 {
     private static final Logger TRACE_LOGGER = 
Logger.getLogger("proton.trace");
 
+    private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
+
     private enum State
     {
         HEADER0,
@@ -62,8 +64,9 @@ class FrameParser implements TransportIn
 
     private final FrameHandler _frameHandler;
     private final ByteBufferDecoder _decoder;
+    private final int _maxFrameSize;
 
-    private final ByteBuffer _inputBuffer;
+    private ByteBuffer _inputBuffer = null;
     private boolean _tail_closed = false;
 
     private State _state = State.HEADER0;
@@ -87,11 +90,7 @@ class FrameParser implements TransportIn
     {
         _frameHandler = frameHandler;
         _decoder = decoder;
-        if (maxFrameSize > 0) {
-            _inputBuffer = newWriteableBuffer(maxFrameSize);
-        } else {
-            _inputBuffer = newWriteableBuffer(4*1024);
-        }
+        _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
     }
 
     private void input(ByteBuffer in) throws TransportException
@@ -372,6 +371,7 @@ class FrameParser implements TransportIn
 
                             _decoder.setByteBuffer(in);
                             Object val = _decoder.readObject();
+                            _decoder.setByteBuffer(null);
 
                             Binary payload;
 
@@ -475,7 +475,11 @@ class FrameParser implements TransportIn
         if (_tail_closed) {
             return Transport.END_OF_STREAM;
         } else {
-            return _inputBuffer.remaining();
+            if (_inputBuffer != null) {
+                return _inputBuffer.remaining();
+            } else {
+                return _maxFrameSize;
+            }
         }
     }
 
@@ -489,21 +493,37 @@ class FrameParser implements TransportIn
                 throw new TransportException("tail closed");
             }
         }
+
+        if (_inputBuffer == null) {
+            _inputBuffer = newWriteableBuffer(_maxFrameSize);
+        }
+
         return _inputBuffer;
     }
 
     @Override
     public void process() throws TransportException
     {
-        _inputBuffer.flip();
-
-        try
+        if (_inputBuffer != null)
         {
-            input(_inputBuffer);
+            _inputBuffer.flip();
+
+            try
+            {
+                input(_inputBuffer);
+            }
+            finally
+            {
+                if (_inputBuffer.hasRemaining()) {
+                    _inputBuffer.compact();
+                } else {
+                    _inputBuffer = null;
+                }
+            }
         }
-        finally
+        else
         {
-            _inputBuffer.compact();
+            input(_emptyInputBuffer);
         }
     }
 

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1606010&r1=1606009&r2=1606010&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
 Fri Jun 27 10:07:47 2014
@@ -101,8 +101,6 @@ public class TransportImpl extends Endpo
     private SslImpl _ssl;
     private final Ref<ProtocolTracer> _protocolTracer = new Ref(null);
 
-    private ByteBuffer _lastInputBuffer;
-
     private TransportResult _lastTransportResult = TransportResultFactory.ok();
 
     private boolean _init;

Modified: 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java?rev=1606010&r1=1606009&r2=1606010&view=diff
==============================================================================
--- 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
 (original)
+++ 
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java
 Fri Jun 27 10:07:47 2014
@@ -26,23 +26,22 @@ import org.apache.qpid.proton.engine.Tra
 
 class TransportOutputAdaptor implements TransportOutput
 {
-    private TransportOutputWriter _transportOutputWriter;
+    private static final ByteBuffer _emptyHead = 
newReadableBuffer(0).asReadOnlyBuffer();
 
-    private final ByteBuffer _outputBuffer;
-    private final ByteBuffer _head;
+    private final TransportOutputWriter _transportOutputWriter;
+    private final int _maxFrameSize;
+    private final ByteBuffer _scratchBuffer;
+
+    private ByteBuffer _outputBuffer = null;
+    private ByteBuffer _head = null;
     private boolean _output_done = false;
     private boolean _head_closed = false;
 
     TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int 
maxFrameSize)
     {
         _transportOutputWriter = transportOutputWriter;
-        if (maxFrameSize > 0) {
-            _outputBuffer = newWriteableBuffer(maxFrameSize);
-        } else {
-            _outputBuffer = newWriteableBuffer(4*1024);
-        }
-        _head = _outputBuffer.asReadOnlyBuffer();
-        _head.limit(0);
+        _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024;
+        _scratchBuffer = newWriteableBuffer(Math.min(512, _maxFrameSize));
     }
 
     @Override
@@ -52,11 +51,18 @@ class TransportOutputAdaptor implements 
             return Transport.END_OF_STREAM;
         }
 
-        _output_done = _transportOutputWriter.writeInto(_outputBuffer);
-        _head.limit(_outputBuffer.position());
+        try_fill_buffer();
 
-        if (_output_done && _outputBuffer.position() == 0) {
-            return Transport.END_OF_STREAM;
+        if (_outputBuffer != null && _outputBuffer.position() == 0) {
+            release_buffers();
+        }
+
+        if (_outputBuffer == null) {
+            if (_output_done) {
+                return Transport.END_OF_STREAM;
+            } else {
+                return 0;
+            }
         } else {
             return _outputBuffer.position();
         }
@@ -66,17 +72,22 @@ class TransportOutputAdaptor implements 
     public ByteBuffer head()
     {
         pending();
-        return _head;
+        return _head != null ? _head : _emptyHead;
     }
 
     @Override
     public void pop(int bytes)
     {
-        _outputBuffer.flip();
-        _outputBuffer.position(bytes);
-        _outputBuffer.compact();
-        _head.position(0);
-        _head.limit(_outputBuffer.position());
+        if (_outputBuffer != null) {
+            _outputBuffer.flip();
+            _outputBuffer.position(bytes);
+            _outputBuffer.compact();
+            _head.position(0);
+            _head.limit(_outputBuffer.position());
+            if (_outputBuffer.position() == 0) {
+                release_buffers();
+            }
+        }
     }
 
     @Override
@@ -84,6 +95,45 @@ class TransportOutputAdaptor implements 
     {
         _head_closed = true;
         _transportOutputWriter.closed();
+        release_buffers();
+    }
+
+    private void init_buffers() {
+        _outputBuffer = newWriteableBuffer(_maxFrameSize);
+        _head = _outputBuffer.asReadOnlyBuffer();
+        _head.limit(0);
+    }
+
+    private void release_buffers() {
+        _head = null;
+        _outputBuffer = null;
+    }
+
+    private void try_fill_buffer() {
+        boolean done = false;
+        while (!done) {
+            reset_scratch_buffer();
+            _output_done |= _transportOutputWriter.writeInto(_scratchBuffer);
+            done = _scratchBuffer.position() < _scratchBuffer.capacity();
+            if (_scratchBuffer.position() > 0) {
+                copy_scratch_to_output();
+            }
+        }
     }
 
+    private void reset_scratch_buffer() {
+        _scratchBuffer.clear();
+        if (_outputBuffer != null) {
+            _scratchBuffer.limit(Math.min(_scratchBuffer.capacity(), 
_outputBuffer.capacity() - _outputBuffer.position()));
+        }
+    }
+
+    private void copy_scratch_to_output() {
+        if (_outputBuffer == null) {
+            init_buffers();
+        }
+        _scratchBuffer.flip();
+        _outputBuffer.put(_scratchBuffer);
+        _head.limit(_outputBuffer.position());
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to