Author: kwall
Date: Tue Jul 28 15:48:30 2015
New Revision: 1693123
URL: http://svn.apache.org/r1693123
Log:
QPID-6662: [Java Broker] Use direct byte buffers between transport / store and
minimise copying
Added:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteBufferListDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/util/ByteBufferDataOutput.java
Removed:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/ByteBufferWriter.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ByteArrayDataInput.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTable.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/util/Functions.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/framing/FieldTableTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/ConnectionBinding.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/FieldTableMessageTest.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
Tue Jul 28 15:48:30 2015
@@ -60,7 +60,7 @@ import org.apache.qpid.server.store.Tran
import org.apache.qpid.server.store.Xid;
import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
import
org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
@@ -351,7 +351,7 @@ public abstract class AbstractBDBMessage
DatabaseEntry contentKeyEntry = new DatabaseEntry();
LongBinding.longToEntry(messageId, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
- ContentBinding contentTupleBinding = ContentBinding.getInstance();
+ ByteBufferBinding contentTupleBinding =
ByteBufferBinding.getInstance();
getLogger().debug("Message Id: {} Getting content body from offset:
{}", messageId, offset);
@@ -364,8 +364,8 @@ public abstract class AbstractBDBMessage
OperationStatus status = getMessageContentDb().get(null,
contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
if (status == OperationStatus.SUCCESS)
{
- byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
- int size = dataAsBytes.length;
+ ByteBuffer dataAsBytes =
contentTupleBinding.entryToObject(value);
+ int size = dataAsBytes.remaining();
if (offset > size)
{
throw new RuntimeException("Offset " + offset + " is
greater than message size " + size
@@ -378,8 +378,11 @@ public abstract class AbstractBDBMessage
{
written = dst.remaining();
}
+ dataAsBytes.position(dataAsBytes.position()+offset);
- dst.put(dataAsBytes, offset, written);
+ dataAsBytes = dataAsBytes.slice();
+ dataAsBytes.limit(written);
+ dst.put(dataAsBytes);
}
return written;
}
@@ -392,20 +395,18 @@ public abstract class AbstractBDBMessage
}
}
- byte[] getAllContent(long messageId) throws StoreException
+ ByteBuffer getAllContent(long messageId) throws StoreException
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
LongBinding.longToEntry(messageId, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
- ContentBinding contentTupleBinding = ContentBinding.getInstance();
+ ByteBufferBinding contentTupleBinding =
ByteBufferBinding.getInstance();
getLogger().debug("Message Id: {} Getting content body", messageId);
try
{
-
- int written = 0;
OperationStatus status = getMessageContentDb().get(null,
contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
if (status == OperationStatus.SUCCESS)
{
@@ -511,8 +512,8 @@ public abstract class AbstractBDBMessage
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- ContentBinding messageBinding = ContentBinding.getInstance();
- messageBinding.objectToEntry(contentBody.array(), value);
+ ByteBufferBinding messageBinding = ByteBufferBinding.getInstance();
+ messageBinding.objectToEntry(contentBody, value);
try
{
OperationStatus status = getMessageContentDb().put(tx, key, value);
@@ -884,15 +885,15 @@ public abstract class AbstractBDBMessage
static interface MessageDataRef<T extends StorableMessageMetaData>
{
T getMetaData();
- byte[] getData();
- void setData(byte[] data);
+ ByteBuffer getData();
+ void setData(ByteBuffer data);
boolean isHardRef();
}
private static final class MessageDataHardRef<T extends
StorableMessageMetaData> implements MessageDataRef<T>
{
private final T _metaData;
- private byte[] _data;
+ private volatile ByteBuffer _data;
private MessageDataHardRef(final T metaData)
{
@@ -906,13 +907,13 @@ public abstract class AbstractBDBMessage
}
@Override
- public byte[] getData()
+ public ByteBuffer getData()
{
return _data;
}
@Override
- public void setData(final byte[] data)
+ public void setData(final ByteBuffer data)
{
_data = data;
}
@@ -927,9 +928,9 @@ public abstract class AbstractBDBMessage
private static final class MessageData<T extends StorableMessageMetaData>
{
private T _metaData;
- private SoftReference<byte[]> _data;
+ private SoftReference<ByteBuffer> _data;
- private MessageData(final T metaData, final byte[] data)
+ private MessageData(final T metaData, final ByteBuffer data)
{
_metaData = metaData;
@@ -944,12 +945,12 @@ public abstract class AbstractBDBMessage
return _metaData;
}
- public byte[] getData()
+ public ByteBuffer getData()
{
return _data == null ? null : _data.get();
}
- public void setData(final byte[] data)
+ public void setData(final ByteBuffer data)
{
_data = new SoftReference<>(data);
}
@@ -959,7 +960,7 @@ public abstract class AbstractBDBMessage
private static final class MessageDataSoftRef<T extends
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements
MessageDataRef<T>
{
- public MessageDataSoftRef(final T metadata, byte[] data)
+ public MessageDataSoftRef(final T metadata, ByteBuffer data)
{
super(new MessageData<T>(metadata, data));
}
@@ -972,7 +973,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public byte[] getData()
+ public ByteBuffer getData()
{
MessageData<T> ref = get();
@@ -980,7 +981,7 @@ public abstract class AbstractBDBMessage
}
@Override
- public void setData(final byte[] data)
+ public void setData(final ByteBuffer data)
{
MessageData<T> ref = get();
if(ref != null)
@@ -1046,23 +1047,19 @@ public abstract class AbstractBDBMessage
public void addContent(ByteBuffer src)
{
src = src.slice();
- byte[] data = _messageDataRef.getData();
+ ByteBuffer data = _messageDataRef.getData();
if(data == null)
{
- data = new byte[src.remaining()];
- src.duplicate().get(data);
- _messageDataRef.setData(data);
+ _messageDataRef.setData(src);
}
else
{
- byte[] oldData = data;
- data = new byte[oldData.length + src.remaining()];
-
-
- System.arraycopy(oldData, 0, data, 0, oldData.length);
- src.duplicate().get(data, oldData.length, src.remaining());
-
- _messageDataRef.setData(data);
+ int size = data.remaining() + src.remaining();
+ ByteBuffer buf = data.isDirect() ?
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+ buf.put(data.duplicate());
+ buf.put(src.duplicate());
+ buf.flip();
+ _messageDataRef.setData(buf);
}
}
@@ -1076,39 +1073,17 @@ public abstract class AbstractBDBMessage
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- byte[] data = _messageDataRef.getData();
- if(data == null)
- {
- if(stored())
- {
- checkMessageStoreOpen();
- data =
AbstractBDBMessageStore.this.getAllContent(_messageId);
- T metaData = _messageDataRef.getMetaData();
- if (metaData == null)
- {
- metaData = (T) getMessageMetaData(_messageId);
- _messageDataRef = new MessageDataSoftRef<>(metaData,
data);
- }
- else
- {
- _messageDataRef.setData(data);
- }
- }
- else
- {
- data = new byte[0];
- }
- }
-
- int length = Math.min(dst.remaining(), data.length -
offsetInMessage);
- dst.put(data, offsetInMessage, length);
+ ByteBuffer data = getContentAsByteBuffer();
+ data = data.slice();
+ int length = Math.min(dst.remaining(), data.remaining());
+ data.limit(length);
+ dst.put(data);
return length;
}
- @Override
- public ByteBuffer getContent(int offsetInMessage, int size)
+ private ByteBuffer getContentAsByteBuffer()
{
- byte[] data = _messageDataRef.getData();
+ ByteBuffer data = _messageDataRef.getData();
if(data == null)
{
if(stored())
@@ -1119,7 +1094,7 @@ public abstract class AbstractBDBMessage
if (metaData == null)
{
metaData = (T) getMessageMetaData(_messageId);
- _messageDataRef = new MessageDataSoftRef<>(metaData,
data);
+ _messageDataRef = new MessageDataSoftRef<T>(metaData,
data);
}
else
{
@@ -1128,23 +1103,20 @@ public abstract class AbstractBDBMessage
}
else
{
- return null;
+ data = ByteBuffer.wrap(new byte[0]);
}
- }
- try
- {
- return ByteBuffer.wrap(data, offsetInMessage, Math.min(size,
data.length - offsetInMessage));
- }
- catch (IndexOutOfBoundsException e)
- {
- IndexOutOfBoundsException indexOutOfBoundsException =
- new IndexOutOfBoundsException("Error wrapping data
(data.length: " + data.length
- + " offsetInMessage: " +
offsetInMessage
- + " size: " + size);
- indexOutOfBoundsException.initCause(e);
- throw indexOutOfBoundsException;
- }
+ } return data;
+ }
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer data = getContentAsByteBuffer();
+ data = data.duplicate();
+ data.position(offsetInMessage);
+ data = data.slice();
+ data.limit(size);
+ return data;
}
synchronized Runnable store(Transaction txn)
@@ -1156,7 +1128,7 @@ public abstract class AbstractBDBMessage
AbstractBDBMessageStore.this.addContent(txn, _messageId, 0,
_messageDataRef.getData() == null
?
ByteBuffer.allocate(0)
- :
ByteBuffer.wrap(_messageDataRef.getData()));
+ :
_messageDataRef.getData());
MessageDataRef<T> hardRef = _messageDataRef;
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/FieldTableEncoding.java
Tue Jul 28 15:48:30 2015
@@ -25,11 +25,9 @@ import com.sleepycat.bind.tuple.TupleOut
import com.sleepycat.je.DatabaseException;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
public class FieldTableEncoding
{
@@ -39,6 +37,7 @@ public class FieldTableEncoding
public static FieldTable readFieldTable(TupleInput tupleInput) throws
DatabaseException
{
+
long length = tupleInput.readLong();
if (length <= 0)
{
@@ -47,17 +46,9 @@ public class FieldTableEncoding
else
{
- byte[] data = new byte[(int)length];
- tupleInput.readFast(data);
+ ByteBuffer buf =
ByteBufferBinding.getInstance().readByteBuffer(tupleInput, (int) length);
- try
- {
- return new FieldTable(new DataInputStream(new
ByteArrayInputStream(data)),length);
- }
- catch (IOException e)
- {
- throw new StoreException(e);
- }
+ return new FieldTable(buf);
}
Added:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java?rev=1693123&view=auto
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
(added)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ByteBufferBinding.java
Tue Jul 28 15:48:30 2015
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import java.nio.ByteBuffer;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class ByteBufferBinding extends TupleBinding<ByteBuffer>
+{
+ private static final int COPY_BUFFER_SIZE = 8192;
+
+ private static final ThreadLocal<byte[]> COPY_BUFFER = new
ThreadLocal<byte[]>()
+ {
+
+ @Override
+ protected
byte[] initialValue()
+ {
+ return new
byte[COPY_BUFFER_SIZE];
+ }
+ };
+
+ private static final ByteBufferBinding INSTANCE = new ByteBufferBinding();
+
+ public static ByteBufferBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** private constructor forces getInstance instead */
+ private ByteBufferBinding() { }
+
+ @Override
+ public ByteBuffer entryToObject(final TupleInput input)
+ {
+ int available = input.available();
+ ByteBuffer buf = ByteBuffer.allocateDirect(available);
+ byte[] copyBuf = COPY_BUFFER.get();
+ while(available > 0)
+ {
+ int read = input.read(copyBuf);
+ buf.put(copyBuf,0,read);
+ available = input.available();
+ }
+ buf.flip();
+ return buf;
+ }
+
+ @Override
+ public void objectToEntry(ByteBuffer data, final TupleOutput output)
+ {
+ data = data.duplicate();
+ byte[] copyBuf = COPY_BUFFER.get();
+ while(data.hasRemaining())
+ {
+ int length = Math.min(COPY_BUFFER_SIZE, data.remaining());
+ data.get(copyBuf,0,length);
+ output.write(copyBuf,0,length);
+ }
+ }
+
+ public ByteBuffer readByteBuffer(final TupleInput input, int length)
+ {
+ ByteBuffer buf = ByteBuffer.allocateDirect(length);
+ byte[] copyBuf = COPY_BUFFER.get();
+ while(length > 0)
+ {
+ int read = input.read(copyBuf, 0, Math.min(COPY_BUFFER_SIZE,
length));
+ buf.put(copyBuf,0,read);
+ length -= read;
+ }
+ buf.flip();
+ return buf;
+ }
+}
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
(original)
+++
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
Tue Jul 28 15:48:30 2015
@@ -48,6 +48,7 @@ import org.apache.qpid.exchange.Exchange
import org.apache.qpid.framing.AMQFrameDecodingException;
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ByteArrayDataInput;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessagePublishInfo;
@@ -677,7 +678,7 @@ public class UpgradeFrom4To5 extends Abs
try
{
- return ContentHeaderBody.createFromBuffer(new
DataInputStream(new ByteArrayInputStream(underlying)),
+ return ContentHeaderBody.createFromBuffer(new
ByteArrayDataInput(underlying),
bodySize);
}
catch (IOException e)
Modified:
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
(original)
+++
qpid/java/trunk/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
Tue Jul 28 15:48:30 2015
@@ -21,11 +21,12 @@
package org.apache.qpid.server.store.berkeleydb.upgrade;
import java.io.File;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
-import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ByteBufferBinding;
import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
@@ -127,7 +128,7 @@ public class UpgraderTest extends Abstra
private void assertContent()
{
- final ContentBinding contentBinding = ContentBinding.getInstance();
+ final ByteBufferBinding contentBinding =
ByteBufferBinding.getInstance();
CursorOperation contentCursorOperation = new CursorOperation()
{
@@ -137,8 +138,9 @@ public class UpgraderTest extends Abstra
{
long id = LongBinding.entryToLong(key);
assertTrue("Unexpected id", id > 0);
- byte[] content = contentBinding.entryToObject(value);
+ ByteBuffer content = contentBinding.entryToObject(value);
assertNotNull("Unexpected content", content);
+ assertTrue("Expected content", content.hasRemaining());
}
};
new DatabaseTemplate(_environment, "MESSAGE_CONTENT",
null).run(contentCursorOperation);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
Tue Jul 28 15:48:30 2015
@@ -1181,7 +1181,7 @@ public abstract class AbstractJDBCMessag
}
- private byte[] getAllContent(long messageId)
+ private ByteBuffer getAllContent(long messageId)
{
Connection conn = null;
PreparedStatement stmt = null;
@@ -1200,7 +1200,10 @@ public abstract class AbstractJDBCMessag
{
byte[] dataAsBytes = getBlobAsBytes(rs, 1);
- return dataAsBytes;
+ ByteBuffer buf = ByteBuffer.allocateDirect(dataAsBytes.length);
+ buf.put(dataAsBytes);
+ buf.flip();
+ return buf;
}
throw new StoreException("No such message, id: " + messageId);
@@ -1426,15 +1429,15 @@ public abstract class AbstractJDBCMessag
static interface MessageDataRef<T extends StorableMessageMetaData>
{
T getMetaData();
- byte[] getData();
- void setData(byte[] data);
+ ByteBuffer getData();
+ void setData(ByteBuffer data);
boolean isHardRef();
}
private static final class MessageDataHardRef<T extends
StorableMessageMetaData> implements MessageDataRef<T>
{
private final T _metaData;
- private byte[] _data;
+ private ByteBuffer _data;
private MessageDataHardRef(final T metaData)
{
@@ -1448,13 +1451,13 @@ public abstract class AbstractJDBCMessag
}
@Override
- public byte[] getData()
+ public ByteBuffer getData()
{
return _data;
}
@Override
- public void setData(final byte[] data)
+ public void setData(final ByteBuffer data)
{
_data = data;
}
@@ -1469,9 +1472,9 @@ public abstract class AbstractJDBCMessag
private static final class MessageData<T extends StorableMessageMetaData>
{
private T _metaData;
- private SoftReference<byte[]> _data;
+ private SoftReference<ByteBuffer> _data;
- private MessageData(final T metaData, final byte[] data)
+ private MessageData(final T metaData, final ByteBuffer data)
{
_metaData = metaData;
@@ -1486,12 +1489,12 @@ public abstract class AbstractJDBCMessag
return _metaData;
}
- public byte[] getData()
+ public ByteBuffer getData()
{
return _data == null ? null : _data.get();
}
- public void setData(final byte[] data)
+ public void setData(final ByteBuffer data)
{
_data = new SoftReference<>(data);
}
@@ -1501,7 +1504,7 @@ public abstract class AbstractJDBCMessag
private static final class MessageDataSoftRef<T extends
StorableMessageMetaData> extends SoftReference<MessageData<T>> implements
MessageDataRef<T>
{
- public MessageDataSoftRef(final T metadata, byte[] data)
+ public MessageDataSoftRef(final T metadata, ByteBuffer data)
{
super(new MessageData<T>(metadata, data));
}
@@ -1514,7 +1517,7 @@ public abstract class AbstractJDBCMessag
}
@Override
- public byte[] getData()
+ public ByteBuffer getData()
{
MessageData<T> ref = get();
@@ -1522,7 +1525,7 @@ public abstract class AbstractJDBCMessag
}
@Override
- public void setData(final byte[] data)
+ public void setData(final ByteBuffer data)
{
MessageData<T> ref = get();
if(ref != null)
@@ -1598,25 +1601,20 @@ public abstract class AbstractJDBCMessag
public void addContent(ByteBuffer src)
{
src = src.slice();
- byte[] data = _messageDataRef.getData();
-
+ ByteBuffer data = _messageDataRef.getData();
if(data == null)
{
- data = new byte[src.remaining()];
- src.duplicate().get(data);
- _messageDataRef.setData(data);
+ _messageDataRef.setData(src);
}
else
{
- byte[] oldData = data;
- data = new byte[oldData.length + src.remaining()];
-
- System.arraycopy(oldData,0,data,0,oldData.length);
- src.duplicate().get(data, oldData.length, src.remaining());
-
- _messageDataRef.setData(data);
+ int size = data.remaining() + src.remaining();
+ ByteBuffer buf = data.isDirect() ?
ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
+ buf.put(data.duplicate());
+ buf.put(src.duplicate());
+ buf.flip();
+ _messageDataRef.setData(buf);
}
-
}
@Override
@@ -1628,8 +1626,17 @@ public abstract class AbstractJDBCMessag
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- byte[] data = _messageDataRef.getData();
+ ByteBuffer data = getContentAsByteBuffer();
+ data = data.slice();
+ int length = Math.min(dst.remaining(), data.remaining());
+ data.limit(length);
+ dst.put(data);
+ return length;
+ }
+ private ByteBuffer getContentAsByteBuffer()
+ {
+ ByteBuffer data = _messageDataRef.getData();
if(data == null)
{
if(stored())
@@ -1656,57 +1663,23 @@ public abstract class AbstractJDBCMessag
}
else
{
- data = new byte[0];
+ data = ByteBuffer.wrap(new byte[0]);
}
- }
-
- int length = Math.min(dst.remaining(), data.length -
offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
-
+ } return data;
}
-
@Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
- byte[] data = _messageDataRef.getData();
-
- if(data == null)
- {
-
- if(stored())
- {
- checkMessageStoreOpen();
-
- data =
AbstractJDBCMessageStore.this.getAllContent(_messageId);
- T metaData = _messageDataRef.getMetaData();
- if (metaData == null)
- {
- try
- {
- metaData = (T)
AbstractJDBCMessageStore.this.getMetaData(_messageId);
- _messageDataRef = new
MessageDataSoftRef<>(metaData, data);
- }
- catch (SQLException e)
- {
- throw new StoreException("Failed to get content
for message id: " + _messageId, e);
- }
- }
- else
- {
- _messageDataRef.setData(data);
- }
- }
- else
- {
- return null;
- }
- }
- return
ByteBuffer.wrap(data,offsetInMessage,Math.min(size,data.length-offsetInMessage));
-
+ ByteBuffer data = getContentAsByteBuffer();
+ data = data.duplicate();
+ data.position(offsetInMessage);
+ data = data.slice();
+ data.limit(size);
+ return data;
}
+
@Override
public void remove()
{
@@ -1746,8 +1719,8 @@ public abstract class AbstractJDBCMessag
storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_messageDataRef.getData() == null
- ?
ByteBuffer.allocate(0)
- :
ByteBuffer.wrap(_messageDataRef.getData()));
+ ?
ByteBuffer.allocateDirect(0)
+ :
_messageDataRef.getData());
getLogger().debug("Storing message {} to store", _messageId);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
Tue Jul 28 15:48:30 2015
@@ -61,7 +61,7 @@ public class StoredMemoryMessage<T exten
: contentSize;
ByteBuffer oldContent = _content;
oldContent.flip();
- _content = ByteBuffer.allocate(size);
+ _content = ByteBuffer.allocateDirect(size);
_content.put(oldContent);
_content.put(src.duplicate());
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Tue Jul 28 15:48:30 2015
@@ -471,7 +471,10 @@ public class MultiVersionProtocolEngine
{
_logger.debug("Unsupported protocol version requested,
replying with: " + supportedReplyVersion);
}
- _sender.send(ByteBuffer.wrap(supportedReplyBytes));
+ final ByteBuffer supportedReplyBuf =
ByteBuffer.allocateDirect(supportedReplyBytes.length);
+ supportedReplyBuf.put(supportedReplyBytes);
+ supportedReplyBuf.flip();
+ _sender.send(supportedReplyBuf);
_sender.flush();
_delegate = new ClosedDelegateProtocolEngine();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Tue Jul 28 15:48:30 2015
@@ -83,7 +83,7 @@ public class NonBlockingConnection imple
_receiveBufferSize = receiveBufferSize;
- _netInputBuffer = ByteBuffer.allocate(receiveBufferSize);
+ _netInputBuffer = ByteBuffer.allocateDirect(receiveBufferSize);
_remoteSocketAddress =
_socketChannel.socket().getRemoteSocketAddress().toString();
_port = port;
@@ -359,7 +359,7 @@ public class NonBlockingConnection imple
else
{
// compact into new buffer
- _netInputBuffer = ByteBuffer.allocate(_receiveBufferSize);
+ _netInputBuffer = ByteBuffer.allocateDirect(_receiveBufferSize);
_netInputBuffer.put(duplicate);
}
return readData;
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
Tue Jul 28 15:48:30 2015
@@ -105,7 +105,7 @@ public class NonBlockingConnectionTLSDel
do
{
ByteBuffer appInputBuffer =
-
ByteBuffer.allocate(_sslEngine.getSession().getApplicationBufferSize() + 50);
+
ByteBuffer.allocateDirect(_sslEngine.getSession().getApplicationBufferSize() +
50);
_status = _sslEngine.unwrap(wrappedDataBuffer, appInputBuffer);
if (_status.getStatus() == SSLEngineResult.Status.CLOSED)
{
@@ -135,7 +135,7 @@ public class NonBlockingConnectionTLSDel
{
if(_sslEngine.getHandshakeStatus() !=
SSLEngineResult.HandshakeStatus.NEED_UNWRAP)
{
- final ByteBuffer netBuffer =
ByteBuffer.allocate(_sslEngine.getSession().getPacketBufferSize());
+ final ByteBuffer netBuffer =
ByteBuffer.allocateDirect(_sslEngine.getSession().getPacketBufferSize());
_status = _sslEngine.wrap(bufferArray, netBuffer);
runSSLEngineTasks(_status);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
Tue Jul 28 15:48:30 2015
@@ -103,7 +103,7 @@ public class AMQPConnection_0_10 extends
_connection.setRemoteAddress(network.getRemoteAddress());
_connection.setLocalAddress(network.getLocalAddress());
- _inputHandler = new InputHandler(new ServerAssembler(_connection));
+ _inputHandler = new InputHandler(new ServerAssembler(_connection),
true);
_network = network;
Subject.doAs(getSubject(), new PrivilegedAction<Object>()
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
Tue Jul 28 15:48:30 2015
@@ -21,6 +21,8 @@
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,5 +55,11 @@ public class ServerAssembler extends Ass
}
}
+ @Override
+ protected ByteBuffer allocateByteBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
Tue Jul 28 15:48:30 2015
@@ -29,6 +29,9 @@ import static org.apache.qpid.transport.
import java.nio.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
@@ -48,6 +51,7 @@ import org.apache.qpid.transport.network
*/
public final class ServerDisassembler implements ProtocolEventSender,
ProtocolDelegate<Void>, FrameSizeObserver
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ServerDisassembler.class);
private final ByteBufferSender _sender;
private int _maxPayload;
private final Object _sendLock = new Object();
@@ -89,7 +93,7 @@ public final class ServerDisassembler im
private void frame(byte flags, byte type, byte track, int channel, int
size, ByteBuffer buf)
{
- ByteBuffer data = ByteBuffer.wrap(new byte[HEADER_SIZE]);
+ ByteBuffer data = ByteBuffer.allocateDirect(HEADER_SIZE);
data.put(0, flags);
data.put(1, type);
@@ -141,7 +145,7 @@ public final class ServerDisassembler im
public void init(Void v, ProtocolHeader header)
{
- _sender.send(header.toByteBuffer());
+ _sender.send(header.toByteBuffer(true));
_sender.flush();
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
Tue Jul 28 15:48:30 2015
@@ -44,7 +44,7 @@ public final class ServerEncoder extends
{
_initialCapacity = capacity;
_threshold = capacity/16;
- _out = ByteBuffer.allocate(capacity);
+ _out = ByteBuffer.allocateDirect(capacity);
_segment = 0;
}
@@ -55,7 +55,7 @@ public final class ServerEncoder extends
_out = _out.slice();
if(_out.remaining() < _threshold)
{
- _out = ByteBuffer.allocate(_initialCapacity);
+ _out = ByteBuffer.allocateDirect(_initialCapacity);
}
_segment = 0;
}
@@ -84,7 +84,7 @@ public final class ServerEncoder extends
{
ByteBuffer old = _out;
int capacity = old.capacity();
- _out = ByteBuffer.allocate(Math.max(Math.max(capacity + size,
2*capacity), _initialCapacity));
+ _out = ByteBuffer.allocateDirect(Math.max(Math.max(capacity + size,
2*capacity), _initialCapacity));
old.flip();
_out.put(old);
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Tue Jul 28 15:48:30 2015
@@ -451,7 +451,7 @@ public class AMQChannel
for(int i = 0 ; i < bodyCount ; i++)
{
ContentBody contentChunk =
_currentMessage.getContentChunk(i);
-
handle.addContent(ByteBuffer.wrap(contentChunk.getPayload()));
+
handle.addContent(contentChunk.getPayload().duplicate());
bodyLengthReceived += contentChunk.getSize();
}
}
@@ -2507,7 +2507,7 @@ public class AMQChannel
}
@Override
- public void receiveMessageContent(final byte[] data)
+ public void receiveMessageContent(final ByteBuffer data)
{
if(_logger.isDebugEnabled())
{
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
Tue Jul 28 15:48:30 2015
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.AMQFrameD
import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ByteBufferDataInput;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.framing.FieldTable;
@@ -158,15 +159,14 @@ public class MessageMetaData implements
{
try
{
- ByteBufferInputStream bbis = new ByteBufferInputStream(buf);
- DataInputStream dais = new DataInputStream(bbis);
- int size = EncodingUtils.readInteger(dais);
- ContentHeaderBody chb =
ContentHeaderBody.createFromBuffer(dais, size);
- final AMQShortString exchange =
EncodingUtils.readAMQShortString(dais);
- final AMQShortString routingKey =
EncodingUtils.readAMQShortString(dais);
+ ByteBufferDataInput dataInput = new ByteBufferDataInput(buf);
+ int size = EncodingUtils.readInteger(dataInput);
+ ContentHeaderBody chb =
ContentHeaderBody.createFromBuffer(dataInput, size);
+ final AMQShortString exchange =
EncodingUtils.readAMQShortString(dataInput);
+ final AMQShortString routingKey =
EncodingUtils.readAMQShortString(dataInput);
- final byte flags = EncodingUtils.readByte(dais);
- long arrivalTime = EncodingUtils.readLong(dais);
+ final byte flags = EncodingUtils.readByte(dataInput);
+ long arrivalTime = EncodingUtils.readLong(dataInput);
MessagePublishInfo publishBody =
new MessagePublishInfo(exchange,
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
Tue Jul 28 15:48:30 2015
@@ -48,7 +48,6 @@ import org.apache.qpid.util.GZIPUtils;
public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
{
private static final int BASIC_CLASS_ID = 60;
-
private final AMQPConnection_0_8 _connection;
private static final AMQShortString GZIP_ENCODING =
AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING);
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
Tue Jul 28 15:48:30 2015
@@ -61,7 +61,17 @@ public class SymbolTypeConstructor exten
size = in.getInt();
}
- BinaryString binaryStr = new BinaryString(in.array(),
in.arrayOffset()+in.position(), size);
+ BinaryString binaryStr;
+ if (in.hasArray())
+ {
+ binaryStr = new BinaryString(in.array(),
in.arrayOffset()+in.position(), size);
+ }
+ else
+ {
+ byte[] b = new byte[in.remaining()];
+ in.duplicate().get(b);
+ binaryStr = new BinaryString(b, 0, b.length);
+ }
Symbol symbolVal = SYMBOL_MAP.get(binaryStr);
if(symbolVal == null)
Modified:
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Tue Jul 28 15:48:30 2015
@@ -282,7 +282,16 @@ class WebSocketProvider implements Accep
{
try
{
-
_connection.sendMessage(msg.array(),msg.arrayOffset()+msg.position(),msg.remaining());
+ if (msg.hasArray())
+ {
+ _connection.sendMessage(msg.array(), msg.arrayOffset() +
msg.position(), msg.remaining());
+ }
+ else
+ {
+ byte[] copy = new byte[msg.remaining()];
+ msg.duplicate().get(copy);
+ _connection.sendMessage(copy, 0, copy.length);
+ }
}
catch (IOException e)
{
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
Tue Jul 28 15:48:30 2015
@@ -392,9 +392,7 @@ public class BasicMessageProducer_0_8 ex
if (frames.length == (offset + 1))
{
- byte[] data = new byte[payload.remaining()];
- payload.get(data);
- frames[offset] = ContentBody.createAMQFrame(channelId, new
ContentBody(data));
+ frames[offset] = ContentBody.createAMQFrame(channelId, new
ContentBody(payload.slice()));
}
else
{
@@ -406,10 +404,8 @@ public class BasicMessageProducer_0_8 ex
payload.position((int) framePayloadMax * (i - offset));
int length = (remaining >= framePayloadMax) ? (int)
framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- byte[] data = new byte[payload.remaining()];
- payload.get(data);
- frames[i] = ContentBody.createAMQFrame(channelId, new
ContentBody(data));
+ frames[i] = ContentBody.createAMQFrame(channelId, new
ContentBody(payload.slice()));
remaining -= length;
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
Tue Jul 28 15:48:30 2015
@@ -76,7 +76,7 @@ public abstract class AbstractJMSMessage
_logger.debug("Non-fragmented message body (bodySize=" +
contentHeader.getBodySize() + ")");
}
- data = ByteBuffer.wrap(((ContentBody)
bodies.get(0)).getPayload());
+ data = ((ContentBody) bodies.get(0)).getPayload().duplicate();
}
else if (bodies != null)
{
@@ -91,7 +91,7 @@ public abstract class AbstractJMSMessage
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload =
ByteBuffer.wrap(cb.getPayload());
+ final ByteBuffer payload = cb.getPayload().duplicate();
if (payload.isDirect() || payload.isReadOnly())
{
data.put(payload);
@@ -193,13 +193,11 @@ public abstract class AbstractJMSMessage
private class BodyInputStream extends InputStream
{
private final Iterator<ContentBody> _bodiesIter;
- private byte[] _currentBuffer;
- private int _currentPos;
+ private ByteBuffer _currentBuffer;
public BodyInputStream(final List<ContentBody> bodies)
{
_bodiesIter = bodies.iterator();
- _currentBuffer = _bodiesIter.next().getPayload();
- _currentPos = 0;
+ _currentBuffer = _bodiesIter.next().getPayload().duplicate();
}
@Override
@@ -220,7 +218,7 @@ public abstract class AbstractJMSMessage
@Override
public int read(final byte[] dst, final int off, final int len)
{
- while(_currentPos == _currentBuffer.length)
+ while(!_currentBuffer.hasRemaining())
{
if(!_bodiesIter.hasNext())
{
@@ -228,13 +226,11 @@ public abstract class AbstractJMSMessage
}
else
{
- _currentBuffer = _bodiesIter.next().getPayload();
- _currentPos = 0;
+ _currentBuffer =
_bodiesIter.next().getPayload().duplicate();
}
}
- int size = Math.min(len, _currentBuffer.length - _currentPos);
- System.arraycopy(_currentBuffer,_currentPos, dst,off,size);
- _currentPos+=size;
+ int size = Math.min(len, _currentBuffer.remaining());
+ _currentBuffer.get(dst,off,size);
return size;
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
Tue Jul 28 15:48:30 2015
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ByteArrayDataInput;
public class Encrypted091MessageFactory extends AbstractJMSMessageFactory
{
@@ -143,11 +144,10 @@ public class Encrypted091MessageFactory
BasicContentHeaderProperties properties = new
BasicContentHeaderProperties();
int payloadOffset;
- try (ByteArrayInputStream bis = new
ByteArrayInputStream(unencryptedBytes);
- DataInputStream dis = new DataInputStream(bis))
- {
- payloadOffset = properties.read(dis);
- }
+ ByteArrayDataInput dataInput = new
ByteArrayDataInput(unencryptedBytes);
+
+ payloadOffset = properties.read(dataInput);
+
final ByteBuffer unencryptedData =
ByteBuffer.wrap(unencryptedBytes, payloadOffset,
unencryptedBytes.length - payloadOffset);
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
Tue Jul 28 15:48:30 2015
@@ -89,7 +89,7 @@ public class UnprocessedMessage_0_8 exte
if (body.getPayload() != null)
{
- final long payloadSize = body.getPayload().length;
+ final long payloadSize = body.getPayload().remaining();
if (_bodies == null)
{
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
Tue Jul 28 15:48:30 2015
@@ -63,6 +63,8 @@ public abstract class AMQDecoder<T exten
private List<ByteArrayInputStream> _remainingBufs = new
ArrayList<ByteArrayInputStream>();
+ private List<ByteBuffer> _incompleteBuffers = new ArrayList<ByteBuffer>();
+
/**
* Creates a new AMQP decoder.
*
@@ -98,144 +100,13 @@ public abstract class AMQDecoder<T exten
return _methodProcessor;
}
- private class RemainingByteArrayInputStream extends InputStream
- {
- private int _currentListPos;
- private int _markPos;
-
-
- @Override
- public int read() throws IOException
- {
- ByteArrayInputStream currentStream =
_remainingBufs.get(_currentListPos);
- if(currentStream.available() > 0)
- {
- return currentStream.read();
- }
- else if((_currentListPos == _remainingBufs.size())
- || (++_currentListPos == _remainingBufs.size()))
- {
- return -1;
- }
- else
- {
-
- ByteArrayInputStream stream =
_remainingBufs.get(_currentListPos);
- stream.mark(0);
- return stream.read();
- }
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws
IOException
- {
-
- if(_currentListPos == _remainingBufs.size())
- {
- return -1;
- }
- else
- {
- ByteArrayInputStream currentStream =
_remainingBufs.get(_currentListPos);
- final int available = currentStream.available();
- int read = currentStream.read(b, off, len > available ?
available : len);
- if(read < len)
- {
- if(_currentListPos++ != _remainingBufs.size())
- {
- _remainingBufs.get(_currentListPos).mark(0);
- }
- int correctRead = read == -1 ? 0 : read;
- int subRead = read(b, off+correctRead, len-correctRead);
- if(subRead == -1)
- {
- return read;
- }
- else
- {
- return correctRead+subRead;
- }
- }
- else
- {
- return len;
- }
- }
- }
-
- @Override
- public int available() throws IOException
- {
- int total = 0;
- for(int i = _currentListPos; i < _remainingBufs.size(); i++)
- {
- total += _remainingBufs.get(i).available();
- }
- return total;
- }
-
- @Override
- public void mark(final int readlimit)
- {
- _markPos = _currentListPos;
- final ByteArrayInputStream stream =
_remainingBufs.get(_currentListPos);
- if(stream != null)
- {
- stream.mark(readlimit);
- }
- }
-
- @Override
- public void reset() throws IOException
- {
- _currentListPos = _markPos;
- final int size = _remainingBufs.size();
- if(_currentListPos < size)
- {
- _remainingBufs.get(_currentListPos).reset();
- }
- for(int i = _currentListPos+1; i<size; i++)
- {
- _remainingBufs.get(i).reset();
- }
- }
- }
-
- private static class SimpleDataInputStream extends DataInputStream
implements MarkableDataInput
- {
- public SimpleDataInputStream(InputStream in)
- {
- super(in);
- }
-
- public AMQShortString readAMQShortString() throws IOException
- {
- return EncodingUtils.readAMQShortString(this);
- }
-
- }
-
public void decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException,
AMQProtocolVersionException, IOException
{
- MarkableDataInput msg;
-
-
- // get prior remaining data from accumulator
- ByteArrayInputStream bais;
- DataInput di;
- if(!_remainingBufs.isEmpty())
- {
- bais = new
ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(),
buf.remaining());
- _remainingBufs.add(bais);
- msg = new SimpleDataInputStream(new
RemainingByteArrayInputStream());
- }
- else
- {
- bais = null;
- msg = new
ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(),
buf.remaining());
- }
+ buf = buf.slice();
+ _incompleteBuffers.add(buf);
+ ByteBufferListDataInput msg = new
ByteBufferListDataInput(_incompleteBuffers);
// If this is the first read then we may be getting a protocol
initiation back if we tried to negotiate
// an unsupported version
@@ -268,60 +139,28 @@ public abstract class AMQDecoder<T exten
}
}
+ }
- if(!enoughData)
+ ListIterator<ByteBuffer> iter = _incompleteBuffers.listIterator();
+ while(iter.hasNext())
+ {
+ ByteBuffer next = iter.next();
+ if(next.hasRemaining())
{
- if(!_remainingBufs.isEmpty())
+ if(next.position() != 0)
{
- _remainingBufs.remove(_remainingBufs.size()-1);
- ListIterator<ByteArrayInputStream> iterator =
_remainingBufs.listIterator();
- while(iterator.hasNext() && iterator.next().available() ==
0)
- {
- iterator.remove();
- }
- }
-
- if(bais == null)
- {
- if(msg.available()!=0)
- {
- byte[] remaining = new byte[msg.available()];
- msg.read(remaining);
- _remainingBufs.add(new
ByteArrayInputStream(remaining));
- }
- }
- else
- {
- if(bais.available()!=0)
- {
- byte[] remaining = new byte[bais.available()];
- bais.read(remaining);
- _remainingBufs.add(new
ByteArrayInputStream(remaining));
- }
- }
-
- if(_remainingBufs.size() > MAX_BUFFERS_LIMIT)
- {
- int totalSize = 0;
- for(ByteArrayInputStream stream : _remainingBufs)
- {
- totalSize += stream.available();
- }
-
- byte[] completeBuffer = new byte[totalSize];
- int pos = 0;
- for(ByteArrayInputStream stream : _remainingBufs)
- {
- pos += stream.read(completeBuffer, pos,
stream.available());
- }
-
- _remainingBufs.clear();
- _remainingBufs.add(new
ByteArrayInputStream(completeBuffer));
+ iter.set(next.slice());
}
+ break;
+ }
+ else
+ {
+ iter.remove();
}
}
}
+
private boolean decodable(final MarkableDataInput in) throws
AMQFrameDecodingException, IOException
{
final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
Tue Jul 28 15:48:30 2015
@@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQShortS
import java.io.DataInput;
import java.io.IOException;
+import java.nio.ByteBuffer;
public interface MarkableDataInput extends DataInput
{
@@ -36,6 +37,8 @@ public interface MarkableDataInput exten
int read(byte[] b) throws IOException;
+ ByteBuffer readAsByteBuffer(int len) throws IOException;
+
public AMQShortString readAMQShortString() throws IOException;
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
Tue Jul 28 15:48:30 2015
@@ -62,21 +62,27 @@ public class AMQFrame extends AMQDataBlo
}
private static final byte[] FRAME_END_BYTE_ARRAY = new byte[] {
FRAME_END_BYTE };
+ private static final ByteBuffer FRAME_END_BYTE_BUFFER =
ByteBuffer.allocateDirect(1);
+ static
+ {
+ FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE);
+ FRAME_END_BYTE_BUFFER.flip();
+ }
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
- byte[] frameHeader = new byte[7];
- BytesDataOutput buffer = new BytesDataOutput(frameHeader);
+ ByteBuffer frameHeader = ByteBuffer.allocateDirect(7);
- buffer.writeByte(_bodyFrame.getFrameType());
- EncodingUtils.writeUnsignedShort(buffer, _channel);
- EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
- sender.send(ByteBuffer.wrap(frameHeader));
+ frameHeader.put(_bodyFrame.getFrameType());
+ EncodingUtils.writeUnsignedShort(frameHeader, _channel);
+ EncodingUtils.writeUnsignedInteger(frameHeader, _bodyFrame.getSize());
+ frameHeader.flip();
+ sender.send(frameHeader);
long size = 8 + _bodyFrame.writePayload(sender);
- sender.send(ByteBuffer.wrap(FRAME_END_BYTE_ARRAY));
+ sender.send(FRAME_END_BYTE_BUFFER.duplicate());
return size;
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
Tue Jul 28 15:48:30 2015
@@ -26,16 +26,21 @@ import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.QpidException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferDataOutput;
import org.apache.qpid.util.BytesDataOutput;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AMQMethodBodyImpl.class);
public static final byte TYPE = 1;
public AMQMethodBodyImpl()
@@ -111,11 +116,13 @@ public abstract class AMQMethodBodyImpl
@Override
public long writePayload(final ByteBufferSender sender) throws IOException
{
+
final int size = getSize();
- byte[] bytes = new byte[size];
- BytesDataOutput buffer = new BytesDataOutput(bytes);
- writePayload(buffer);
- sender.send(ByteBuffer.wrap(bytes));
+ ByteBuffer buf = ByteBuffer.allocateDirect(size);
+ ByteBufferDataOutput dataOutput = new ByteBufferDataOutput(buf);
+ writePayload(dataOutput);
+ buf.flip();
+ sender.send(buf);
return size;
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
Tue Jul 28 15:48:30 2015
@@ -24,6 +24,7 @@ package org.apache.qpid.framing;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
@@ -115,6 +116,33 @@ public final class AMQShortString implem
}
+ public static AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ int length = ((int) buffer.get()) & 0xff;
+ if(length == 0)
+ {
+ return null;
+ }
+ else
+ {
+ if (length > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("Cannot create
AMQShortString with number of octets over 255!");
+ }
+ if(length > buffer.remaining())
+ {
+ throw new IllegalArgumentException("Cannot create
AMQShortString with length "
+ + length + " from a
ByteBuffer with only "
+ + buffer.remaining()
+ + " bytes.");
+
+ }
+ byte[] data = new byte[length];
+ buffer.get(data);
+ return new AMQShortString(data, 0, length);
+ }
+ }
+
public AMQShortString(byte[] data, final int offset, final int length)
{
if (length > MAX_LENGTH)
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
(original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQType.java
Tue Jul 28 15:48:30 2015
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collection;
+import org.apache.qpid.codec.MarkableDataInput;
+
/**
* AMQType is a type that represents the different possible AMQP field table
types. It provides operations for each
* of the types to perform tasks such as calculating the size of an instance
of the type, converting types between AMQP
@@ -57,7 +58,7 @@ public enum AMQType
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -103,7 +104,7 @@ public enum AMQType
EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readUnsignedInteger(buffer);
}
@@ -142,7 +143,7 @@ public enum AMQType
EncodingUtils.writeInteger(buffer, unscaled);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
byte places = EncodingUtils.readByte(buffer);
@@ -179,7 +180,7 @@ public enum AMQType
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -259,7 +260,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
try
{
@@ -322,7 +323,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(DataInput buffer) throws
IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer)
throws IOException
{
// Read size of field table then all name/value pairs.
return FieldArray.readFromBuffer(buffer);
@@ -352,7 +353,7 @@ public enum AMQType
public void writeValueImpl(Object value, DataOutput buffer)
{ }
- public Object readValueFromBuffer(DataInput buffer)
+ public Object readValueFromBuffer(MarkableDataInput buffer)
{
return null;
}
@@ -383,7 +384,7 @@ public enum AMQType
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readLongstr(buffer);
}
@@ -413,7 +414,7 @@ public enum AMQType
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -444,7 +445,7 @@ public enum AMQType
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -479,7 +480,7 @@ public enum AMQType
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readBoolean(buffer);
}
@@ -514,7 +515,7 @@ public enum AMQType
EncodingUtils.writeChar(buffer, (Character) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readChar(buffer);
}
@@ -549,7 +550,7 @@ public enum AMQType
EncodingUtils.writeByte(buffer, (Byte) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readByte(buffer);
}
@@ -588,7 +589,7 @@ public enum AMQType
EncodingUtils.writeShort(buffer, (Short) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readShort(buffer);
}
@@ -630,7 +631,7 @@ public enum AMQType
EncodingUtils.writeInteger(buffer, (Integer) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readInteger(buffer);
}
@@ -677,7 +678,7 @@ public enum AMQType
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -712,7 +713,7 @@ public enum AMQType
EncodingUtils.writeFloat(buffer, (Float) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readFloat(buffer);
}
@@ -751,7 +752,7 @@ public enum AMQType
EncodingUtils.writeDouble(buffer, (Double) value);
}
- public Object readValueFromBuffer(DataInput buffer) throws IOException
+ public Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException
{
return EncodingUtils.readDouble(buffer);
}
@@ -840,5 +841,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(DataInput buffer) throws IOException;
+ abstract Object readValueFromBuffer(MarkableDataInput buffer) throws
IOException;
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1693123&r1=1693122&r2=1693123&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
Tue Jul 28 15:48:30 2015
@@ -28,6 +28,8 @@ import java.util.Collection;
import java.util.Date;
import java.util.Map;
+import org.apache.qpid.codec.MarkableDataInput;
+
/**
* AMQTypedValue combines together a native Java Object value, and an {@link
AMQType}, as a fully typed AMQP parameter
* value. It provides the ability to read and write fully typed parameters to
and from byte buffers. It also provides
@@ -65,7 +67,7 @@ public abstract class AMQTypedValue
_value = type.toNativeValue(value);
}
- private GenericTypedValue(AMQType type, DataInput buffer) throws
IOException
+ private GenericTypedValue(AMQType type, MarkableDataInput buffer)
throws IOException
{
_type = type;
_value = type.readValueFromBuffer(buffer);
@@ -198,7 +200,7 @@ public abstract class AMQTypedValue
}
- public static AMQTypedValue readFromBuffer(DataInput buffer) throws
IOException
+ public static AMQTypedValue readFromBuffer(MarkableDataInput buffer)
throws IOException
{
AMQType type = AMQTypeMap.getType(buffer.readByte());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]