Author: rgodfrey
Date: Sun Aug 23 14:20:02 2015
New Revision: 1697182
URL: http://svn.apache.org/r1697182
Log:
QPID-6662 : Ensure buffers are disposed when messages are flowed to disk
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
Sun Aug 23 14:20:02 2015
@@ -35,8 +35,6 @@ import java.util.UUID;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
@@ -74,7 +72,6 @@ import org.apache.qpid.server.store.berk
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.util.ByteBufferUtils;
public abstract class AbstractBDBMessageStore implements MessageStore
@@ -399,11 +396,13 @@ public abstract class AbstractBDBMessage
}
}
- QpidByteBuffer getAllContent(long messageId) throws StoreException
+ Collection<QpidByteBuffer> getAllContent(long messageId) throws
StoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
LongBinding.longToEntry(messageId, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
+
+
ByteBufferBinding contentTupleBinding =
ByteBufferBinding.getInstance();
@@ -412,9 +411,22 @@ public abstract class AbstractBDBMessage
try
{
OperationStatus status = getMessageContentDb().get(null,
contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+
if (status == OperationStatus.SUCCESS)
{
- return contentTupleBinding.entryToObject(value);
+ byte[] data = value.getData();
+ int offset = value.getOffset();
+ int length = value.getSize();
+ Collection<QpidByteBuffer> buffers =
QpidByteBuffer.allocateDirectCollectionFromPool(length
+
);
+ for(QpidByteBuffer buf : buffers)
+ {
+ int bufSize = buf.remaining();
+ buf.put(data, offset, bufSize);
+ buf.flip();
+ offset+=bufSize;
+ }
+ return buffers;
}
else
{
@@ -1005,7 +1017,18 @@ public abstract class AbstractBDBMessage
public void clear()
{
- _metaData = null;
+ if(_metaData != null)
+ {
+ _metaData.clearEncodedForm();
+ _metaData = null;
+ }
+ if(_data != null)
+ {
+ for(QpidByteBuffer buf : _data)
+ {
+ buf.dispose();
+ }
+ }
_data = null;
}
@@ -1043,7 +1066,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public T getMetaData()
+ public synchronized T getMetaData()
{
T metaData = _messageDataRef.getMetaData();
@@ -1063,7 +1086,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public void addContent(QpidByteBuffer src)
+ public synchronized void addContent(QpidByteBuffer src)
{
src = src.slice();
Collection<QpidByteBuffer> data = _messageDataRef.getData();
@@ -1088,7 +1111,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public int getContent(int offsetInMessage, ByteBuffer dst)
+ public synchronized int getContent(int offsetInMessage, final
ByteBuffer dst)
{
Collection<QpidByteBuffer> allContent = getContentAsByteBuffer();
int length = 0;
@@ -1108,7 +1131,7 @@ public abstract class AbstractBDBMessage
if(stored())
{
checkMessageStoreOpen();
- data =
Collections.singleton(AbstractBDBMessageStore.this.getAllContent(_messageId));
+ data =
AbstractBDBMessageStore.this.getAllContent(_messageId);
T metaData = _messageDataRef.getMetaData();
if (metaData == null)
{
@@ -1124,11 +1147,12 @@ public abstract class AbstractBDBMessage
{
data = Collections.emptyList();
}
- } return data;
+ }
+ return data;
}
@Override
- public Collection<QpidByteBuffer> getContent(int offsetInMessage, int
size)
+ public synchronized Collection<QpidByteBuffer> getContent(final int
offsetInMessage, final int size)
{
int pos = 0;
int added = 0;
@@ -1217,7 +1241,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public void remove()
+ public synchronized void remove()
{
checkMessageStoreOpen();
Collection<QpidByteBuffer> data = _messageDataRef.getData();
@@ -1235,7 +1259,6 @@ public abstract class AbstractBDBMessage
}
}
metaData.dispose();
- _messageDataRef = null;
}
@Override
@@ -1250,7 +1273,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public boolean flowToDisk()
+ public synchronized boolean flowToDisk()
{
flushToStore();
@@ -1266,6 +1289,7 @@ public abstract class AbstractBDBMessage
{
return this.getClass() + "[messageId=" + _messageId + "]";
}
+
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageMetaData.java
Sun Aug 23 14:20:02 2015
@@ -94,6 +94,12 @@ public class InternalMessageMetaData imp
}
+ @Override
+ public void clearEncodedForm()
+ {
+
+ }
+
static InternalMessageMetaData create(boolean persistent, final
InternalMessageHeader header, int contentSize)
{
return new InternalMessageMetaData(persistent, header, contentSize);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
Sun Aug 23 14:20:02 2015
@@ -1200,7 +1200,8 @@ public abstract class AbstractJDBCMessag
{
byte[] dataAsBytes = getBlobAsBytes(rs, 1);
- QpidByteBuffer buf =
QpidByteBuffer.allocateDirect(dataAsBytes.length);
+ QpidByteBuffer buf =
QpidByteBuffer.allocateDirect(dataAsBytes.length
+ );
buf.put(dataAsBytes);
buf.flip();
return buf;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
Sun Aug 23 14:20:02 2015
@@ -36,5 +36,8 @@ public interface StorableMessageMetaData
boolean isPersistent();
void dispose();
+
+ void clearEncodedForm();
+
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Sun Aug 23 14:20:02 2015
@@ -23,7 +23,6 @@ package org.apache.qpid.server.transport
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.security.cert.Certificate;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionPlainDelegate.java
Sun Aug 23 14:20:02 2015
@@ -20,7 +20,6 @@
package org.apache.qpid.server.transport;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
Sun Aug 23 14:20:02 2015
@@ -20,7 +20,6 @@
package org.apache.qpid.server.transport;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.QpidByteBufferUtils;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
@@ -39,7 +38,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
-import java.util.Map;
public class NonBlockingConnectionTLSDelegate implements
NonBlockingConnectionDelegate
{
@@ -320,6 +318,15 @@ public class NonBlockingConnectionTLSDel
@Override
public void shutdownOutput()
{
+ try
+ {
+ _sslEngine.closeOutbound();
+ _sslEngine.closeInbound();
+ }
+ catch (SSLException e)
+ {
+ LOGGER.debug("Exception when closing SSLEngine", e);
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionUndecidedDelegate.java
Sun Aug 23 14:20:02 2015
@@ -23,7 +23,6 @@ import org.apache.qpid.bytebuffer.QpidBy
import org.apache.qpid.transport.network.TransportEncryption;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
Sun Aug 23 14:20:02 2015
@@ -84,6 +84,12 @@ public class TestMessageMetaData impleme
}
@Override
+ public void clearEncodedForm()
+ {
+
+ }
+
+ @Override
public int writeToBuffer(ByteBuffer dest)
{
int oldPosition = dest.position();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
Sun Aug 23 14:20:02 2015
@@ -179,6 +179,12 @@ public class MessageMetaData_0_10 implem
}
+ @Override
+ public void clearEncodedForm()
+ {
+
+ }
+
public String getRoutingKey()
{
return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
Sun Aug 23 14:20:02 2015
@@ -75,7 +75,7 @@ public class MessageMetaData implements
}
- public ContentHeaderBody getContentHeaderBody()
+ public synchronized ContentHeaderBody getContentHeaderBody()
{
return _contentHeaderBody;
}
@@ -152,12 +152,17 @@ public class MessageMetaData implements
}
@Override
- public void dispose()
+ public synchronized void dispose()
{
_contentHeaderBody.dispose();
_contentHeaderBody = null;
}
+ public synchronized void clearEncodedForm()
+ {
+ _contentHeaderBody.clearEncodedForm();
+ }
+
private static class MetaDataFactory implements
MessageMetaDataType.Factory<MessageMetaData>
{
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/SASLFrameHandler.java
Sun Aug 23 14:20:02 2015
@@ -28,7 +28,6 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import java.nio.ByteBuffer;
import java.util.Formatter;
public class SASLFrameHandler implements ProtocolHandler
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Sun Aug 23 14:20:02 2015
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
Sun Aug 23 14:20:02 2015
@@ -387,6 +387,12 @@ public class MessageMetaData_1_0 impleme
_encoded = null;
}
+ @Override
+ public void clearEncodedForm()
+ {
+
+ }
+
private static class MetaDataFactory implements
MessageMetaDataType.Factory<MessageMetaData_1_0>
{
private final AMQPDescribedTypeRegistry _typeRegistry =
AMQPDescribedTypeRegistry.newInstance();
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
Sun Aug 23 14:20:02 2015
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
Sun Aug 23 14:20:02 2015
@@ -28,8 +28,11 @@ import java.nio.CharBuffer;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -493,7 +496,41 @@ public final class QpidByteBuffer
}
+ public static Collection<QpidByteBuffer>
allocateDirectCollectionFromPool(int size)
+ {
+ final int maxPooledBufferSize = _maxPooledBufferSize.get();
+ if(size <= maxPooledBufferSize || maxPooledBufferSize == 0)
+ {
+ return Collections.singleton(allocateDirectFromPool(size));
+ }
+ else
+ {
+ List<QpidByteBuffer> collections = new ArrayList<>((size /
maxPooledBufferSize)+2);
+ int remaining = size;
+
+ QpidByteBuffer buf = _cachedBuffer.get();
+ if(buf != null)
+ {
+ collections.add(buf.slice());
+ remaining -= buf.remaining();
+ buf.dispose();
+ }
+ while(remaining > maxPooledBufferSize)
+ {
+ collections.add(allocateDirect(maxPooledBufferSize));
+ remaining -= maxPooledBufferSize;
+ }
+ buf = allocateDirect(maxPooledBufferSize);
+ collections.add(buf.view(0, remaining));
+ buf.position(buf.position()+remaining);
+ _cachedBuffer.set(buf.slice());
+ buf.dispose();
+ return collections;
+
+ }
+
+ }
public ByteBuffer asByteBuffer()
{
_ref.removeFromPool();
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
Sun Aug 23 14:20:02 2015
@@ -22,11 +22,9 @@ package org.apache.qpid.framing;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.BytesDataOutput;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
Sun Aug 23 14:20:02 2015
@@ -24,7 +24,6 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,12 +32,9 @@ import org.apache.qpid.AMQChannelExcepti
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.ByteBufferDataOutput;
-import org.apache.qpid.util.BytesDataOutput;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
Sun Aug 23 14:20:02 2015
@@ -20,10 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +29,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.ByteBufferDataOutput;
-import org.apache.qpid.util.BytesDataOutput;
public class BasicContentHeaderProperties
{
@@ -131,7 +127,7 @@ public class BasicContentHeaderPropertie
{
}
- public int getPropertyListSize()
+ public synchronized int getPropertyListSize()
{
if(useEncodedForm())
{
@@ -232,7 +228,7 @@ public class BasicContentHeaderPropertie
return _propertyFlags;
}
- public void writePropertyListPayload(DataOutput buffer) throws IOException
+ public synchronized void writePropertyListPayload(DataOutput buffer)
throws IOException
{
if(useEncodedForm())
{
@@ -352,7 +348,7 @@ public class BasicContentHeaderPropertie
{
length++;
_encoding = EncodingUtils.readAMQShortString(input);
- if(_encodedForm != null)
+ if(_encoding != null)
{
length += _encoding.length();
}
@@ -471,7 +467,7 @@ public class BasicContentHeaderPropertie
}
- public long writePropertyListPayload(final ByteBufferSender sender) throws
IOException
+ public synchronized long writePropertyListPayload(final ByteBufferSender
sender) throws IOException
{
if(useEncodedForm())
{
@@ -493,7 +489,7 @@ public class BasicContentHeaderPropertie
}
- public void populatePropertiesFromBuffer(MarkableDataInput buffer, int
propertyFlags, int size) throws AMQFrameDecodingException, IOException
+ public synchronized void populatePropertiesFromBuffer(MarkableDataInput
buffer, int propertyFlags, int size) throws AMQFrameDecodingException,
IOException
{
_propertyFlags = propertyFlags;
@@ -501,7 +497,10 @@ public class BasicContentHeaderPropertie
{
_logger.debug("Property flags: " + _propertyFlags);
}
-
+ if(_encodedForm != null)
+ {
+ _encodedForm.dispose();
+ }
_encodedForm = buffer.readAsByteBuffer(size);
final QpidByteBuffer byteBuffer = _encodedForm.slice();
@@ -605,7 +604,7 @@ public class BasicContentHeaderPropertie
return (_contentType == null) ? null : _contentType.toString();
}
- public void setContentType(AMQShortString contentType)
+ public synchronized void setContentType(AMQShortString contentType)
{
if(contentType == null)
@@ -617,7 +616,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= CONTENT_TYPE_MASK;
}
_contentType = contentType;
- _encodedForm = null;
+ nullEncodedForm();
}
public void setContentType(String contentType)
@@ -641,7 +640,7 @@ public class BasicContentHeaderPropertie
setEncoding(encoding == null ? null :
AMQShortString.valueOf(encoding));
}
- public void setEncoding(AMQShortString encoding)
+ public synchronized void setEncoding(AMQShortString encoding)
{
if(encoding == null)
{
@@ -652,7 +651,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= ENCODING_MASK;
}
_encoding = encoding;
- _encodedForm = null;
+ nullEncodedForm();
}
public FieldTable getHeaders()
@@ -665,7 +664,7 @@ public class BasicContentHeaderPropertie
return _headers;
}
- public void setHeaders(FieldTable headers)
+ public synchronized void setHeaders(FieldTable headers)
{
if(headers == null)
{
@@ -676,7 +675,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= HEADERS_MASK;
}
_headers = headers;
- _encodedForm = null;
+ nullEncodedForm();
}
public byte getDeliveryMode()
@@ -684,11 +683,11 @@ public class BasicContentHeaderPropertie
return _deliveryMode;
}
- public void setDeliveryMode(byte deliveryMode)
+ public synchronized void setDeliveryMode(byte deliveryMode)
{
_propertyFlags |= DELIVERY_MODE_MASK;
_deliveryMode = deliveryMode;
- _encodedForm = null;
+ nullEncodedForm();
}
public byte getPriority()
@@ -696,11 +695,11 @@ public class BasicContentHeaderPropertie
return _priority;
}
- public void setPriority(byte priority)
+ public synchronized void setPriority(byte priority)
{
_propertyFlags |= PRIORITY_MASK;
_priority = priority;
- _encodedForm = null;
+ nullEncodedForm();
}
public AMQShortString getCorrelationId()
@@ -718,7 +717,7 @@ public class BasicContentHeaderPropertie
setCorrelationId((correlationId == null) ? null :
AMQShortString.valueOf(correlationId));
}
- public void setCorrelationId(AMQShortString correlationId)
+ public synchronized void setCorrelationId(AMQShortString correlationId)
{
if(correlationId == null)
{
@@ -729,7 +728,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= CORRELATION_ID_MASK;
}
_correlationId = correlationId;
- _encodedForm = null;
+ nullEncodedForm();
}
public String getReplyToAsString()
@@ -747,7 +746,7 @@ public class BasicContentHeaderPropertie
setReplyTo((replyTo == null) ? null : AMQShortString.valueOf(replyTo));
}
- public void setReplyTo(AMQShortString replyTo)
+ public synchronized void setReplyTo(AMQShortString replyTo)
{
if(replyTo == null)
{
@@ -758,7 +757,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= REPLY_TO_MASK;
}
_replyTo = replyTo;
- _encodedForm = null;
+ nullEncodedForm();
}
public long getExpiration()
@@ -766,7 +765,7 @@ public class BasicContentHeaderPropertie
return _expiration;
}
- public void setExpiration(long expiration)
+ public synchronized void setExpiration(long expiration)
{
if(expiration == 0l)
{
@@ -777,7 +776,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= EXPIRATION_MASK;
}
_expiration = expiration;
- _encodedForm = null;
+ nullEncodedForm();
}
public AMQShortString getMessageId()
@@ -795,7 +794,7 @@ public class BasicContentHeaderPropertie
setMessageId(messageId == null ? null : new AMQShortString(messageId));
}
- public void setMessageId(AMQShortString messageId)
+ public synchronized void setMessageId(AMQShortString messageId)
{
if(messageId == null)
{
@@ -806,7 +805,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= MESSAGE_ID_MASK;
}
_messageId = messageId;
- _encodedForm = null;
+ nullEncodedForm();
}
public long getTimestamp()
@@ -814,11 +813,11 @@ public class BasicContentHeaderPropertie
return _timestamp;
}
- public void setTimestamp(long timestamp)
+ public synchronized void setTimestamp(long timestamp)
{
_propertyFlags |= TIMESTAMP_MASK;
_timestamp = timestamp;
- _encodedForm = null;
+ nullEncodedForm();
}
public String getTypeAsString()
@@ -836,7 +835,7 @@ public class BasicContentHeaderPropertie
setType((type == null) ? null : AMQShortString.valueOf(type));
}
- public void setType(AMQShortString type)
+ public synchronized void setType(AMQShortString type)
{
if(type == null)
{
@@ -847,7 +846,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= TYPE_MASK;
}
_type = type;
- _encodedForm = null;
+ nullEncodedForm();
}
public String getUserIdAsString()
@@ -865,7 +864,7 @@ public class BasicContentHeaderPropertie
setUserId((userId == null) ? null : AMQShortString.valueOf(userId));
}
- public void setUserId(AMQShortString userId)
+ public synchronized void setUserId(AMQShortString userId)
{
if(userId == null)
{
@@ -876,7 +875,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= USER_ID_MASK;
}
_userId = userId;
- _encodedForm = null;
+ nullEncodedForm();
}
public String getAppIdAsString()
@@ -894,7 +893,7 @@ public class BasicContentHeaderPropertie
setAppId((appId == null) ? null : AMQShortString.valueOf(appId));
}
- public void setAppId(AMQShortString appId)
+ public synchronized void setAppId(AMQShortString appId)
{
if(appId == null)
{
@@ -905,7 +904,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= APPLICATION_ID_MASK;
}
_appId = appId;
- _encodedForm = null;
+ nullEncodedForm();
}
public String getClusterIdAsString()
@@ -923,7 +922,7 @@ public class BasicContentHeaderPropertie
setClusterId((clusterId == null) ? null :
AMQShortString.valueOf(clusterId));
}
- public void setClusterId(AMQShortString clusterId)
+ public synchronized void setClusterId(AMQShortString clusterId)
{
if(clusterId == null)
{
@@ -934,7 +933,7 @@ public class BasicContentHeaderPropertie
_propertyFlags |= CLUSTER_ID_MASK;
}
_clusterId = clusterId;
- _encodedForm = null;
+ nullEncodedForm();
}
public String toString()
@@ -945,23 +944,37 @@ public class BasicContentHeaderPropertie
+ _expiration + ",JMSPriority = " + _priority + ",JMSTimestamp = "
+ _timestamp + ",JMSType = " + _type;
}
- private boolean useEncodedForm()
+ private synchronized boolean useEncodedForm()
{
return _encodedForm != null && (_headers == null ||
_headers.isClean());
}
- public void dispose()
+ public synchronized void dispose()
{
- if(_encodedForm != null)
- {
- _encodedForm.dispose();
- _encodedForm = null;
- }
+ nullEncodedForm();
if(_headers != null)
{
_headers.dispose();
_headers = null;
}
}
+
+ public synchronized void clearEncodedForm()
+ {
+ nullEncodedForm();
+ if(_headers != null)
+ {
+ _headers.clearEncodedForm();
+ }
+ }
+
+ private synchronized void nullEncodedForm()
+ {
+ if(_encodedForm != null)
+ {
+ _encodedForm.dispose();
+ _encodedForm = null;
+ }
+ }
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1697182&r1=1697181&r2=1697182&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
Sun Aug 23 14:20:02 2015
@@ -20,11 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
import org.apache.qpid.QpidException;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
@@ -32,7 +29,6 @@ import org.apache.qpid.codec.MarkableDat
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.util.BytesDataOutput;
public class ContentHeaderBody implements AMQBody
{
@@ -205,4 +201,9 @@ public class ContentHeaderBody implement
_properties.dispose();
_properties = null;
}
+
+ public void clearEncodedForm()
+ {
+ _properties.clearEncodedForm();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]