Author: rgodfrey
Date: Sun Aug 23 14:20:02 2015
New Revision: 1697182

URL: http://svn.apache.org/r1697182
Log:
QPID-6662 : Ensure buffers are disposed when messages are flowed to disk

Modified:
    
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java

Modified: 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (original)
+++ 
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Sun Aug 23 14:20:02 2015
@@ -35,8 +35,6 @@ import java.util.UUID;
 
 import com.sleepycat.bind.tuple.ByteBinding;
 import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleOutput;
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseEntry;
@@ -74,7 +72,6 @@ import org.apache.qpid.server.store.berk
 import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
 import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.util.ByteBufferUtils;
 
 
 public abstract class AbstractBDBMessageStore implements MessageStore
@@ -399,11 +396,13 @@ public abstract class AbstractBDBMessage
         }
     }
 
-    QpidByteBuffer getAllContent(long messageId) throws StoreException
+    Collection<QpidByteBuffer> getAllContent(long messageId) throws 
StoreException
     {
         DatabaseEntry contentKeyEntry = new DatabaseEntry();
         LongBinding.longToEntry(messageId, contentKeyEntry);
         DatabaseEntry value = new DatabaseEntry();
+
+
         ByteBufferBinding contentTupleBinding = 
ByteBufferBinding.getInstance();
 
 
@@ -412,9 +411,22 @@ public abstract class AbstractBDBMessage
         try
         {
             OperationStatus status = getMessageContentDb().get(null, 
contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+
             if (status == OperationStatus.SUCCESS)
             {
-                return contentTupleBinding.entryToObject(value);
+                byte[] data = value.getData();
+                int offset = value.getOffset();
+                int length = value.getSize();
+                Collection<QpidByteBuffer> buffers = 
QpidByteBuffer.allocateDirectCollectionFromPool(length
+                                                                               
                     );
+                for(QpidByteBuffer buf : buffers)
+                {
+                    int bufSize = buf.remaining();
+                    buf.put(data, offset, bufSize);
+                    buf.flip();
+                    offset+=bufSize;
+                }
+                return buffers;
             }
             else
             {
@@ -1005,7 +1017,18 @@ public abstract class AbstractBDBMessage
 
         public void clear()
         {
-            _metaData = null;
+            if(_metaData != null)
+            {
+                _metaData.clearEncodedForm();
+                _metaData = null;
+            }
+            if(_data != null)
+            {
+                for(QpidByteBuffer buf : _data)
+                {
+                    buf.dispose();
+                }
+            }
             _data = null;
         }
 
@@ -1043,7 +1066,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public T getMetaData()
+        public synchronized T getMetaData()
         {
             T metaData = _messageDataRef.getMetaData();
 
@@ -1063,7 +1086,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public void addContent(QpidByteBuffer src)
+        public synchronized void addContent(QpidByteBuffer src)
         {
             src = src.slice();
             Collection<QpidByteBuffer> data = _messageDataRef.getData();
@@ -1088,7 +1111,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public int getContent(int offsetInMessage, ByteBuffer dst)
+        public synchronized int getContent(int offsetInMessage, final 
ByteBuffer dst)
         {
             Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
             int length = 0;
@@ -1108,7 +1131,7 @@ public abstract class AbstractBDBMessage
                 if(stored())
                 {
                     checkMessageStoreOpen();
-                    data = 
Collections.singleton(AbstractBDBMessageStore.this.getAllContent(_messageId));
+                    data = 
AbstractBDBMessageStore.this.getAllContent(_messageId);
                     T metaData = _messageDataRef.getMetaData();
                     if (metaData == null)
                     {
@@ -1124,11 +1147,12 @@ public abstract class AbstractBDBMessage
                 {
                     data = Collections.emptyList();
                 }
-            } return data;
+            }
+            return data;
         }
 
         @Override
-        public Collection<QpidByteBuffer> getContent(int offsetInMessage, int 
size)
+        public synchronized Collection<QpidByteBuffer> getContent(final int 
offsetInMessage, final int size)
         {
             int pos = 0;
             int added = 0;
@@ -1217,7 +1241,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public void remove()
+        public synchronized void remove()
         {
             checkMessageStoreOpen();
             Collection<QpidByteBuffer> data = _messageDataRef.getData();
@@ -1235,7 +1259,6 @@ public abstract class AbstractBDBMessage
                 }
             }
             metaData.dispose();
-            _messageDataRef = null;
         }
 
         @Override
@@ -1250,7 +1273,7 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public boolean flowToDisk()
+        public synchronized boolean flowToDisk()
         {
 
             flushToStore();
@@ -1266,6 +1289,7 @@ public abstract class AbstractBDBMessage
         {
             return this.getClass() + "[messageId=" + _messageId + "]";
         }
+
     }
 
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
 Sun Aug 23 14:20:02 2015
@@ -94,6 +94,12 @@ public class InternalMessageMetaData imp
 
     }
 
+    @Override
+    public void clearEncodedForm()
+    {
+
+    }
+
     static InternalMessageMetaData create(boolean persistent, final 
InternalMessageHeader header, int contentSize)
     {
         return new InternalMessageMetaData(persistent, header, contentSize);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
 Sun Aug 23 14:20:02 2015
@@ -1200,7 +1200,8 @@ public abstract class AbstractJDBCMessag
             {
 
                 byte[] dataAsBytes = getBlobAsBytes(rs, 1);
-                QpidByteBuffer buf = 
QpidByteBuffer.allocateDirect(dataAsBytes.length);
+                QpidByteBuffer buf = 
QpidByteBuffer.allocateDirect(dataAsBytes.length
+                                                                  );
                 buf.put(dataAsBytes);
                 buf.flip();
                 return buf;

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
 Sun Aug 23 14:20:02 2015
@@ -36,5 +36,8 @@ public interface StorableMessageMetaData
     boolean isPersistent();
 
     void dispose();
+
+    void clearEncodedForm();
+
 }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 Sun Aug 23 14:20:02 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.transport
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.security.cert.Certificate;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
 Sun Aug 23 14:20:02 2015
@@ -20,7 +20,6 @@
 package org.apache.qpid.server.transport;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
 Sun Aug 23 14:20:02 2015
@@ -20,7 +20,6 @@
 package org.apache.qpid.server.transport;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.util.QpidByteBufferUtils;
 import org.apache.qpid.transport.network.security.ssl.SSLUtil;
@@ -39,7 +38,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.Map;
 
 public class NonBlockingConnectionTLSDelegate implements 
NonBlockingConnectionDelegate
 {
@@ -320,6 +318,15 @@ public class NonBlockingConnectionTLSDel
     @Override
     public void shutdownOutput()
     {
+        try
+        {
+            _sslEngine.closeOutbound();
+            _sslEngine.closeInbound();
+        }
+        catch (SSLException e)
+        {
+            LOGGER.debug("Exception when closing SSLEngine", e);
+        }
 
     }
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
 Sun Aug 23 14:20:02 2015
@@ -23,7 +23,6 @@ import org.apache.qpid.bytebuffer.QpidBy
 import org.apache.qpid.transport.network.TransportEncryption;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Collection;

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
 Sun Aug 23 14:20:02 2015
@@ -84,6 +84,12 @@ public class TestMessageMetaData impleme
     }
 
     @Override
+    public void clearEncodedForm()
+    {
+
+    }
+
+    @Override
     public int writeToBuffer(ByteBuffer dest)
     {
         int oldPosition = dest.position();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
 Sun Aug 23 14:20:02 2015
@@ -179,6 +179,12 @@ public class MessageMetaData_0_10 implem
 
     }
 
+    @Override
+    public void clearEncodedForm()
+    {
+
+    }
+
     public String getRoutingKey()
     {
         return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
 Sun Aug 23 14:20:02 2015
@@ -75,7 +75,7 @@ public class MessageMetaData implements
     }
 
 
-    public ContentHeaderBody getContentHeaderBody()
+    public synchronized ContentHeaderBody getContentHeaderBody()
     {
         return _contentHeaderBody;
     }
@@ -152,12 +152,17 @@ public class MessageMetaData implements
     }
 
     @Override
-    public void dispose()
+    public synchronized void dispose()
     {
         _contentHeaderBody.dispose();
         _contentHeaderBody = null;
     }
 
+    public synchronized void clearEncodedForm()
+    {
+        _contentHeaderBody.clearEncodedForm();
+    }
+
     private static class MetaDataFactory implements 
MessageMetaDataType.Factory<MessageMetaData>
     {
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
 Sun Aug 23 14:20:02 2015
@@ -28,7 +28,6 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 
-import java.nio.ByteBuffer;
 import java.util.Formatter;
 
 public class SASLFrameHandler implements ProtocolHandler

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 Sun Aug 23 14:20:02 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
 Sun Aug 23 14:20:02 2015
@@ -387,6 +387,12 @@ public class MessageMetaData_1_0 impleme
         _encoded = null;
     }
 
+    @Override
+    public void clearEncodedForm()
+    {
+
+    }
+
     private static class MetaDataFactory implements 
MessageMetaDataType.Factory<MessageMetaData_1_0>
     {
         private final AMQPDescribedTypeRegistry _typeRegistry = 
AMQPDescribedTypeRegistry.newInstance();

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
 Sun Aug 23 14:20:02 2015
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
 Sun Aug 23 14:20:02 2015
@@ -28,8 +28,11 @@ import java.nio.CharBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -493,7 +496,41 @@ public final class QpidByteBuffer
 
     }
 
+    public static Collection<QpidByteBuffer> 
allocateDirectCollectionFromPool(int size)
+    {
+        final int maxPooledBufferSize = _maxPooledBufferSize.get();
+        if(size <= maxPooledBufferSize || maxPooledBufferSize == 0)
+        {
+            return Collections.singleton(allocateDirectFromPool(size));
+        }
+        else
+        {
+            List<QpidByteBuffer> collections = new ArrayList<>((size / 
maxPooledBufferSize)+2);
+            int remaining = size;
+
+            QpidByteBuffer buf = _cachedBuffer.get();
+            if(buf != null)
+            {
+                collections.add(buf.slice());
+                remaining -= buf.remaining();
+                buf.dispose();
+            }
+            while(remaining > maxPooledBufferSize)
+            {
+                collections.add(allocateDirect(maxPooledBufferSize));
+                remaining -= maxPooledBufferSize;
+            }
+            buf = allocateDirect(maxPooledBufferSize);
+            collections.add(buf.view(0, remaining));
+            buf.position(buf.position()+remaining);
 
+            _cachedBuffer.set(buf.slice());
+            buf.dispose();
+            return collections;
+
+        }
+
+    }
     public ByteBuffer asByteBuffer()
     {
         _ref.removeFromPool();

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java 
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java 
Sun Aug 23 14:20:02 2015
@@ -22,11 +22,9 @@ package org.apache.qpid.framing;
 
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.BytesDataOutput;
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
 Sun Aug 23 14:20:02 2015
@@ -24,7 +24,6 @@ package org.apache.qpid.framing;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,12 +32,9 @@ import org.apache.qpid.AMQChannelExcepti
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.QpidException;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.ByteBufferDataOutput;
-import org.apache.qpid.util.BytesDataOutput;
 
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
 Sun Aug 23 14:20:02 2015
@@ -20,10 +20,8 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,8 +29,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.ByteBufferDataOutput;
-import org.apache.qpid.util.BytesDataOutput;
 
 public class BasicContentHeaderProperties
 {
@@ -131,7 +127,7 @@ public class BasicContentHeaderPropertie
     { 
     }
 
-    public int getPropertyListSize()
+    public synchronized int getPropertyListSize()
     {
         if(useEncodedForm())
         {
@@ -232,7 +228,7 @@ public class BasicContentHeaderPropertie
         return _propertyFlags;
     }
 
-    public void writePropertyListPayload(DataOutput buffer) throws IOException
+    public synchronized void writePropertyListPayload(DataOutput buffer) 
throws IOException
     {
         if(useEncodedForm())
         {
@@ -352,7 +348,7 @@ public class BasicContentHeaderPropertie
         {
             length++;
             _encoding = EncodingUtils.readAMQShortString(input);
-            if(_encodedForm != null)
+            if(_encoding != null)
             {
                 length += _encoding.length();
             }
@@ -471,7 +467,7 @@ public class BasicContentHeaderPropertie
     }
 
 
-    public long writePropertyListPayload(final ByteBufferSender sender) throws 
IOException
+    public synchronized long writePropertyListPayload(final ByteBufferSender 
sender) throws IOException
     {
         if(useEncodedForm())
         {
@@ -493,7 +489,7 @@ public class BasicContentHeaderPropertie
 
     }
 
-    public void populatePropertiesFromBuffer(MarkableDataInput buffer, int 
propertyFlags, int size) throws AMQFrameDecodingException, IOException
+    public synchronized void populatePropertiesFromBuffer(MarkableDataInput 
buffer, int propertyFlags, int size) throws AMQFrameDecodingException, 
IOException
     {
         _propertyFlags = propertyFlags;
 
@@ -501,7 +497,10 @@ public class BasicContentHeaderPropertie
         {
             _logger.debug("Property flags: " + _propertyFlags);
         }
-
+        if(_encodedForm != null)
+        {
+            _encodedForm.dispose();
+        }
         _encodedForm = buffer.readAsByteBuffer(size);
 
         final QpidByteBuffer byteBuffer = _encodedForm.slice();
@@ -605,7 +604,7 @@ public class BasicContentHeaderPropertie
         return (_contentType == null) ? null : _contentType.toString();
     }
 
-    public void setContentType(AMQShortString contentType)
+    public synchronized void setContentType(AMQShortString contentType)
     {
 
         if(contentType == null)
@@ -617,7 +616,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= CONTENT_TYPE_MASK;
         }
         _contentType = contentType;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public void setContentType(String contentType)
@@ -641,7 +640,7 @@ public class BasicContentHeaderPropertie
         setEncoding(encoding == null ? null : 
AMQShortString.valueOf(encoding));
     }
 
-    public void setEncoding(AMQShortString encoding)
+    public synchronized void setEncoding(AMQShortString encoding)
     {
         if(encoding == null)
         {
@@ -652,7 +651,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= ENCODING_MASK;
         }
         _encoding = encoding;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public FieldTable getHeaders()
@@ -665,7 +664,7 @@ public class BasicContentHeaderPropertie
         return _headers;
     }
 
-    public void setHeaders(FieldTable headers)
+    public synchronized void setHeaders(FieldTable headers)
     {
         if(headers == null)
         {
@@ -676,7 +675,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= HEADERS_MASK;
         }
         _headers = headers;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public byte getDeliveryMode()
@@ -684,11 +683,11 @@ public class BasicContentHeaderPropertie
         return _deliveryMode;
     }
 
-    public void setDeliveryMode(byte deliveryMode)
+    public synchronized void setDeliveryMode(byte deliveryMode)
     {
         _propertyFlags |= DELIVERY_MODE_MASK;
         _deliveryMode = deliveryMode;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public byte getPriority()
@@ -696,11 +695,11 @@ public class BasicContentHeaderPropertie
         return _priority;
     }
 
-    public void setPriority(byte priority)
+    public synchronized void setPriority(byte priority)
     {
         _propertyFlags |= PRIORITY_MASK;
         _priority = priority;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public AMQShortString getCorrelationId()
@@ -718,7 +717,7 @@ public class BasicContentHeaderPropertie
         setCorrelationId((correlationId == null) ? null : 
AMQShortString.valueOf(correlationId));
     }
 
-    public void setCorrelationId(AMQShortString correlationId)
+    public synchronized void setCorrelationId(AMQShortString correlationId)
     {
         if(correlationId == null)
         {
@@ -729,7 +728,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= CORRELATION_ID_MASK;
         }
         _correlationId = correlationId;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public String getReplyToAsString()
@@ -747,7 +746,7 @@ public class BasicContentHeaderPropertie
         setReplyTo((replyTo == null) ? null : AMQShortString.valueOf(replyTo));
     }
 
-    public void setReplyTo(AMQShortString replyTo)
+    public synchronized void setReplyTo(AMQShortString replyTo)
     {
         if(replyTo == null)
         {
@@ -758,7 +757,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= REPLY_TO_MASK;
         }
         _replyTo = replyTo;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public long getExpiration()
@@ -766,7 +765,7 @@ public class BasicContentHeaderPropertie
         return _expiration;
     }
 
-    public void setExpiration(long expiration)
+    public synchronized void setExpiration(long expiration)
     {
         if(expiration == 0l)
         {
@@ -777,7 +776,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= EXPIRATION_MASK;
         }
         _expiration = expiration;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public AMQShortString getMessageId()
@@ -795,7 +794,7 @@ public class BasicContentHeaderPropertie
         setMessageId(messageId == null ? null : new AMQShortString(messageId));
     }
 
-    public void setMessageId(AMQShortString messageId)
+    public synchronized void setMessageId(AMQShortString messageId)
     {
         if(messageId == null)
         {
@@ -806,7 +805,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= MESSAGE_ID_MASK;
         }
         _messageId = messageId;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public long getTimestamp()
@@ -814,11 +813,11 @@ public class BasicContentHeaderPropertie
         return _timestamp;
     }
 
-    public void setTimestamp(long timestamp)
+    public synchronized void setTimestamp(long timestamp)
     {
         _propertyFlags |= TIMESTAMP_MASK;
         _timestamp = timestamp;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public String getTypeAsString()
@@ -836,7 +835,7 @@ public class BasicContentHeaderPropertie
         setType((type == null) ? null : AMQShortString.valueOf(type));
     }
 
-    public void setType(AMQShortString type)
+    public synchronized void setType(AMQShortString type)
     {
         if(type == null)
         {
@@ -847,7 +846,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= TYPE_MASK;
         }
         _type = type;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public String getUserIdAsString()
@@ -865,7 +864,7 @@ public class BasicContentHeaderPropertie
         setUserId((userId == null) ? null : AMQShortString.valueOf(userId));
     }
 
-    public void setUserId(AMQShortString userId)
+    public synchronized void setUserId(AMQShortString userId)
     {
         if(userId == null)
         {
@@ -876,7 +875,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= USER_ID_MASK;
         }
         _userId = userId;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public String getAppIdAsString()
@@ -894,7 +893,7 @@ public class BasicContentHeaderPropertie
         setAppId((appId == null) ? null : AMQShortString.valueOf(appId));
     }
 
-    public void setAppId(AMQShortString appId)
+    public synchronized void setAppId(AMQShortString appId)
     {
         if(appId == null)
         {
@@ -905,7 +904,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= APPLICATION_ID_MASK;
         }
         _appId = appId;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public String getClusterIdAsString()
@@ -923,7 +922,7 @@ public class BasicContentHeaderPropertie
         setClusterId((clusterId == null) ? null : 
AMQShortString.valueOf(clusterId));
     }
 
-    public void setClusterId(AMQShortString clusterId)
+    public synchronized void setClusterId(AMQShortString clusterId)
     {
         if(clusterId == null)
         {
@@ -934,7 +933,7 @@ public class BasicContentHeaderPropertie
             _propertyFlags |= CLUSTER_ID_MASK;
         }
         _clusterId = clusterId;
-        _encodedForm = null;
+        nullEncodedForm();
     }
 
     public String toString()
@@ -945,23 +944,37 @@ public class BasicContentHeaderPropertie
             + _expiration + ",JMSPriority = " + _priority + ",JMSTimestamp = " 
+ _timestamp + ",JMSType = " + _type;
     }
 
-    private boolean useEncodedForm()
+    private synchronized boolean useEncodedForm()
     {
         return _encodedForm != null && (_headers == null || 
_headers.isClean());
     }
 
 
-    public void dispose()
+    public synchronized void dispose()
     {
-        if(_encodedForm != null)
-        {
-            _encodedForm.dispose();
-            _encodedForm = null;
-        }
+        nullEncodedForm();
         if(_headers != null)
         {
             _headers.dispose();
             _headers = null;
         }
     }
+
+    public synchronized void clearEncodedForm()
+    {
+        nullEncodedForm();
+        if(_headers != null)
+        {
+            _headers.clearEncodedForm();
+        }
+    }
+
+    private synchronized void nullEncodedForm()
+    {
+        if(_encodedForm != null)
+        {
+            _encodedForm.dispose();
+            _encodedForm = null;
+        }
+    }
 }

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
 Sun Aug 23 14:20:02 2015
@@ -20,11 +20,8 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.qpid.QpidException;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
@@ -32,7 +29,6 @@ import org.apache.qpid.codec.MarkableDat
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.BytesDataOutput;
 
 public class ContentHeaderBody implements AMQBody
 {
@@ -205,4 +201,9 @@ public class ContentHeaderBody implement
         _properties.dispose();
         _properties = null;
     }
+
+    public void clearEncodedForm()
+    {
+        _properties.clearEncodedForm();
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to