This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/master by this push:
new adb2a34 QPID-8273: [Broker-J] Handle malformed messages
adb2a34 is described below
commit adb2a34306d67559ee81db155826dc67a02cc85e
Author: Alex Rudyy <[email protected]>
AuthorDate: Wed Feb 13 13:48:41 2019 +0000
QPID-8273: [Broker-J] Handle malformed messages
This closes #21
---
.../store/berkeleydb/AbstractBDBMessageStore.java | 42 ++-
.../server/logging/messages/QueueMessages.java | 62 ++++
.../logging/messages/Queue_logmessages.properties | 2 +
.../server/message/AbstractServerMessageImpl.java | 51 ++-
.../apache/qpid/server/message/ServerMessage.java | 11 +
.../java/org/apache/qpid/server/model/Queue.java | 12 +
.../qpid/server/protocol/v0_8/FieldTable.java | 51 ++-
.../apache/qpid/server/queue/AbstractQueue.java | 124 +++++++-
.../queue/FlowToDiskOverflowPolicyHandler.java | 17 +-
.../apache/qpid/server/queue/QueueEntryImpl.java | 2 +-
.../apache/qpid/server/queue/QueueStatistics.java | 17 +
.../apache/qpid/server/queue/SortedQueueImpl.java | 4 +-
.../serializer/v1/MessageStoreSerializer_v1.java | 16 +-
.../server/txn/FlowToDiskTransactionObserver.java | 27 +-
.../server/virtualhost/AbstractVirtualHost.java | 5 +-
.../qpid/server/queue/AbstractQueueTestBase.java | 1 +
.../queue/FlowToDiskOverflowPolicyHandlerTest.java | 3 +
.../qpid/server/queue/QueueEntryImplTestBase.java | 1 +
.../server/queue/QueueMessageRecoveryTest.java | 3 +-
.../server/queue/SimpleQueueEntryImplTest.java | 1 +
.../qpid/server/store/TestMessageMetaDataType.java | 12 +
.../apache/qpid/server/txn/MockServerMessage.java | 12 +
.../protocol/v0_10/AMQPConnection_0_10Impl.java | 4 +-
.../server/protocol/v0_10/ConsumerTarget_0_10.java | 5 +
.../qpid/server/protocol/v0_8/AMQMessage.java | 6 +
.../protocol/v0_8/AMQPConnection_0_8Impl.java | 8 +-
.../server/protocol/v0_8/ConsumerTarget_0_8.java | 5 +
.../qpid/server/protocol/v0_8/MessageMetaData.java | 9 +-
.../transport/BasicContentHeaderProperties.java | 6 +-
.../server/protocol/v1_0/ConsumerTarget_1_0.java | 5 +
.../store/jdbc/AbstractJDBCMessageStore.java | 44 ++-
.../server/management/amqp/ManagementNode.java | 5 +
.../qpid/tests/protocol/v0_10/Interaction.java | 7 +
.../v0_10/extensions/message/MalformedMessage.java | 343 +++++++++++++++++++++
.../qpid/tests/protocol/v0_8/BasicInteraction.java | 12 +-
.../qpid/tests/protocol/v0_8/Interaction.java | 37 +++
.../apache/qpid/tests/protocol/v0_8/BasicTest.java | 4 +-
.../v0_8/extension/basic/MalformedMessage.java | 294 ++++++++++++++++++
.../extensions/qpid/message/MalformedMessage.java | 133 ++++++++
39 files changed, 1309 insertions(+), 94 deletions(-)
diff --git
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index db467293..6a57739 100644
---
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -141,7 +141,7 @@ public abstract class AbstractBDBMessageStore implements
MessageStore
{
for (StoredBDBMessage<?> message : _messages)
{
- message.clear();
+ message.clear(true);
}
_messages.clear();
_inMemorySize.set(0);
@@ -993,20 +993,36 @@ public abstract class AbstractBDBMessageStore implements
MessageStore
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- public long clear()
+ public long clear(boolean close)
{
long bytesCleared = 0;
- if(_metaData != null)
+ if(_data != null)
{
- bytesCleared += _metaData.getStorableSize();
- _metaData.clearEncodedForm();
- _metaData = null;
+ if(_data != null)
+ {
+ bytesCleared += _data.remaining();
+ _data.dispose();
+ _data = null;
+ }
}
- if(_data != null)
+ if (_metaData != null)
{
- bytesCleared += _data.remaining();
- _data.dispose();
- _data = null;
+ bytesCleared += _metaData.getStorableSize();
+ try
+ {
+ if (close)
+ {
+ _metaData.dispose();
+ }
+ else
+ {
+ _metaData.clearEncodedForm();
+ }
+ }
+ finally
+ {
+ _metaData = null;
+ }
}
return bytesCleared;
}
@@ -1222,7 +1238,7 @@ public abstract class AbstractBDBMessageStore implements
MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = _messageDataRef.clear();
+ final long bytesCleared = _messageDataRef.clear(false);
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1244,11 +1260,11 @@ public abstract class AbstractBDBMessageStore
implements MessageStore
}
}
- public synchronized void clear()
+ public synchronized void clear(boolean close)
{
if (_messageDataRef != null)
{
- _messageDataRef.clear();
+ _messageDataRef.clear(close);
}
}
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 6655918..ee10dc3 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -66,6 +66,7 @@ public class QueueMessages
public static final String CREATED_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
public static final String DELETED_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
public static final String DROPPED_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
+ public static final String MALFORMED_MESSAGE_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.malformed_message";
public static final String OPERATION_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
public static final String OVERFULL_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
public static final String UNDERFULL_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
@@ -76,6 +77,7 @@ public class QueueMessages
LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(MALFORMED_MESSAGE_LOG_HIERARCHY);
LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
@@ -328,6 +330,66 @@ public class QueueMessages
/**
* Log a Queue message of the Format:
+ * <pre>QUE-1006 : Malformed : {0} : {1}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage MALFORMED_MESSAGE(String param1, String param2)
+ {
+ String rawMessage = _messages.getString("MALFORMED_MESSAGE");
+
+ final Object[] messageArguments = {param1, param2};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage,
_currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ @Override
+ public String toString()
+ {
+ return message;
+ }
+
+ @Override
+ public String getLogHierarchy()
+ {
+ return MALFORMED_MESSAGE_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) &&
toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Queue message of the Format:
* <pre>QUE-1016 : Operation : {0}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index cd4ecdb..bfcf701 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,8 @@ DELETED = QUE-1002 : Deleted : ID: {0}
OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity :
{1,number}, Messages : {2,number}, Message Capacity : {3,number}
UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity :
{1,number}, Messages : {2,number}, Message Capacity : {3,number}
DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes,
{2,number} messages, Capacity : {3,number} bytes, {4,number} messages
+MALFORMED_MESSAGE = QUE-1006 : Malformed : {0} : {1}
+
# These are no longer in use
#FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth
{0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used
{2,number,#.##} MB, threshold {3,number,#.##} MB
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 3c36dce..2f72098 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -28,6 +28,9 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
@@ -36,7 +39,7 @@ import
org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractServerMessageImpl<X extends
AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements
ServerMessage<T>
{
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractServerMessageImpl.class);
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl>
_refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
"_referenceCount");
@@ -49,6 +52,12 @@ public abstract class AbstractServerMessageImpl<X extends
AbstractServerMessageI
@SuppressWarnings("unused")
private volatile Collection<UUID> _resources;
+ private volatile ServerMessage.ValidationStatus _validationStatus =
ServerMessage.ValidationStatus.UNKNOWN;
+
+ private static final
AtomicReferenceFieldUpdater<AbstractServerMessageImpl,
ServerMessage.ValidationStatus>
+ _validationStatusUpdater =
AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
+
ServerMessage.ValidationStatus.class,
+
"_validationStatus");
public AbstractServerMessageImpl(StoredMessage<T> handle, Object
connectionReference)
{
@@ -192,7 +201,7 @@ public abstract class AbstractServerMessageImpl<X extends
AbstractServerMessageI
}
finally
{
- if (!wasInMemory)
+ if (!wasInMemory && checkValid())
{
storedMessage.flowToDisk();
}
@@ -211,6 +220,44 @@ public abstract class AbstractServerMessageImpl<X extends
AbstractServerMessageI
return "Message[" + debugIdentity() + "]";
}
+ @Override
+ public ServerMessage.ValidationStatus getValidationStatus()
+ {
+ return _validationStatus;
+ }
+
+ @Override
+ public boolean checkValid()
+ {
+ ServerMessage.ValidationStatus status;
+ while ((status = _validationStatus) ==
ServerMessage.ValidationStatus.UNKNOWN)
+ {
+ ServerMessage.ValidationStatus newStatus;
+ try
+ {
+ validate();
+ newStatus = ServerMessage.ValidationStatus.VALID;
+ }
+ catch (RuntimeException e)
+ {
+ newStatus = ServerMessage.ValidationStatus.MALFORMED;
+ LOGGER.debug("Malformed message '{}' detected", this, e);
+ }
+
+ if (_validationStatusUpdater.compareAndSet(this, status,
newStatus))
+ {
+ status = newStatus;
+ break;
+ }
+ }
+ return status == ServerMessage.ValidationStatus.VALID;
+ }
+
+ protected void validate()
+ {
+ // noop
+ }
+
private static class Reference<X extends AbstractServerMessageImpl<X,T>, T
extends StorableMessageMetaData>
implements MessageReference<X>
{
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 37df060..8523668 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -60,4 +60,15 @@ public interface ServerMessage<T extends
StorableMessageMetaData> extends Enqueu
Object getConnectionReference();
boolean isResourceAcceptable(TransactionLogResource resource);
+
+ boolean checkValid();
+
+ ValidationStatus getValidationStatus();
+
+ enum ValidationStatus
+ {
+ UNKNOWN,
+ VALID,
+ MALFORMED
+ }
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 9f04343..26dcdc0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -478,6 +478,16 @@ public interface Queue<X extends Queue<X>> extends
ConfiguredObject<X>,
description = "Current age of oldest message on the
queue.")
long getOldestMessageAge();
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units =
StatisticUnit.BYTES, label = "Malformed",
+ description = "Total size of enqueued malformed messages.")
+ long getTotalMalformedBytes();
+
+ @SuppressWarnings("unused")
+ @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units =
StatisticUnit.MESSAGES, label = "Malformed",
+ description = "Total number of enqueued malformed messages.")
+ long getTotalMalformedMessages();
+
@ManagedOperation(description = "move messages from this queue to
another", changesConfiguredObjectState = false)
List<Long> moveMessages(@Param(name = "destination", description = "The
queue to which the messages should be moved", mandatory = true) Queue<?>
destination,
@Param(name = "messageIds", description = "If
provided, only messages in the queue whose (internal) message-id is supplied
will be considered for moving") List<Long> messageIds,
@@ -572,6 +582,8 @@ public interface Queue<X extends Queue<X>> extends
ConfiguredObject<X>,
QueueEntryIterator queueEntryIterator();
+ boolean checkValid(QueueEntry queueEntry);
+
enum ExpiryPolicy
{
DELETE,
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 1f5e8cb..7ea7958 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -134,6 +134,15 @@ public class FieldTable
{
_encodedForm.reset();
}
+
+ final long recalculateEncodedSize = recalculateEncodedSize();
+ if (_encodedSize != recalculateEncodedSize)
+ {
+ throw new IllegalStateException(String.format(
+ "Malformed field table detected: provided encoded size
'%d' does not equal calculated size '%d'",
+ _encodedSize,
+ recalculateEncodedSize));
+ }
}
}
@@ -141,8 +150,14 @@ public class FieldTable
{
if (!_decoded)
{
- decode();
- _decoded = true;
+ try
+ {
+ decode();
+ }
+ finally
+ {
+ _decoded = true;
+ }
}
}
@@ -329,6 +344,18 @@ public class FieldTable
return _encodedSize;
}
+ private synchronized long recalculateEncodedSize()
+ {
+ long size = 0L;
+ for (Map.Entry<String, AMQTypedValue> e : _properties.entrySet())
+ {
+ String key = e.getKey();
+ AMQTypedValue value = e.getValue();
+ size += EncodingUtils.encodedShortStringLength(key) + 1 +
value.getEncodingSize();
+ }
+ return size;
+ }
+
public static Map<String, Object> convertToMap(final FieldTable fieldTable)
{
final Map<String, Object> map = new HashMap<>();
@@ -358,12 +385,17 @@ public class FieldTable
public synchronized void clearEncodedForm()
{
- decodeIfNecessary();
-
- if (_encodedForm != null)
+ try
{
- _encodedForm.dispose();
- _encodedForm = null;
+ decodeIfNecessary();
+ }
+ finally
+ {
+ if (_encodedForm != null)
+ {
+ _encodedForm.dispose();
+ _encodedForm = null;
+ }
}
}
@@ -498,4 +530,9 @@ public class FieldTable
return null;
}
}
+
+ public synchronized void validate()
+ {
+ decodeIfNecessary();
+ }
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 84a4700..d0f2f30 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1187,7 +1187,7 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
@Override
public final void enqueue(ServerMessage message, Action<? super
MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
-
+ final QueueEntry entry;
if(_recovering.get() != RECOVERED)
{
_enqueuingWhileRecovering.incrementAndGet();
@@ -1211,12 +1211,16 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
{
Thread.yield();
}
- doEnqueue(message, action, enqueueRecord);
+ entry = doEnqueue(message, action, enqueueRecord);
+ }
+ else
+ {
+ entry = null;
}
}
else
{
- doEnqueue(message, action, enqueueRecord);
+ entry = doEnqueue(message, action, enqueueRecord);
}
final StoredMessage storedMessage = message.getStoredMessage();
@@ -1224,7 +1228,21 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
|| QpidByteBuffer.getAllocatedDirectMemorySize() >
_flowToDiskThreshold)
&& storedMessage.isInMemory())
{
- storedMessage.flowToDisk();
+ if (message.checkValid())
+ {
+ storedMessage.flowToDisk();
+ }
+ else
+ {
+ if (entry != null)
+ {
+ malformedEntry(entry);
+ }
+ else
+ {
+ LOGGER.debug("Malformed message '{}' enqueued into '{}'",
message, getName());
+ }
+ }
}
}
@@ -1267,7 +1285,7 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
}
}
- protected void doEnqueue(final ServerMessage message, final Action<? super
MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+ protected QueueEntry doEnqueue(final ServerMessage message, final Action<?
super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
final QueueEntry entry = getEntries().add(message, enqueueRecord);
updateExpiration(entry);
@@ -1296,7 +1314,7 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
}
_postEnqueueOverflowPolicyHandler.checkOverflow(entry);
}
-
+ return entry;
}
private void updateExpiration(final QueueEntry entry)
@@ -1696,13 +1714,15 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
{
QueueEntry node = queueListIterator.getNode();
MessageReference reference = node.newMessageReference();
- if(reference != null)
+ if(reference != null && !node.isDeleted())
{
try
{
-
- final boolean done = !node.isDeleted() &&
visitor.visit(node);
- if(done)
+ if (!reference.getMessage().checkValid())
+ {
+ malformedEntry(node);
+ }
+ else if (visitor.visit(node))
{
break;
}
@@ -2297,9 +2317,16 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
{
try (MessageReference messageReference =
msg.newReference())
{
- for(NotificationCheck check : perMessageChecks)
+ if (!msg.checkValid())
+ {
+ malformedEntry(node);
+ }
+ else
{
- checkForNotification(msg, listener,
currentTime, thresholdTime, check);
+ for (NotificationCheck check :
perMessageChecks)
+ {
+ checkForNotification(msg, listener,
currentTime, thresholdTime, check);
+ }
}
}
catch(MessageDeletedException e)
@@ -2337,6 +2364,68 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
}
}
+ private void malformedEntry(final QueueEntry node)
+ {
+ deleteEntry(node, () -> {
+ _queueStatistics.addToMalformed(node.getSizeWithHeader());
+ logMalformedMessage(node);
+ });
+ }
+
+ private void logMalformedMessage(final QueueEntry node)
+ {
+ final EventLogger eventLogger = getEventLogger();
+ final ServerMessage<?> message = node.getMessage();
+ final StringBuilder messageId = new StringBuilder();
+ messageId.append(message.getMessageNumber());
+ final String id = message.getMessageHeader().getMessageId();
+ if (id != null)
+ {
+ messageId.append('/').append(id);
+ }
+ eventLogger.message(getLogSubject(), QueueMessages.MALFORMED_MESSAGE(
messageId.toString(), "DELETE"));
+ }
+
+ @Override
+ public boolean checkValid(final QueueEntry queueEntry)
+ {
+ final ServerMessage message = queueEntry.getMessage();
+ final ServerMessage.ValidationStatus validationStatus =
message.getValidationStatus();
+ boolean isValid = false;
+ if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+ {
+ try (MessageReference ref = message.newReference())
+ {
+ isValid = message.checkValid();
+ }
+ catch (MessageDeletedException e)
+ {
+ // noop
+ }
+ }
+ else
+ {
+ isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+ }
+ if (!isValid)
+ {
+ malformedEntry(queueEntry);
+ }
+ return isValid;
+ }
+
+ @Override
+ public long getTotalMalformedBytes()
+ {
+ return _queueStatistics.getMalformedSize();
+ }
+
+ @Override
+ public long getTotalMalformedMessages()
+ {
+ return _queueStatistics.getMalformedCount();
+ }
+
@Override
public void reallocateMessages()
{
@@ -2353,7 +2442,14 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
final MessageReference messageReference =
message.newReference();
try
{
- message.getStoredMessage().reallocate();
+ if (!message.checkValid())
+ {
+ malformedEntry(node);
+ }
+ else
+ {
+ message.getStoredMessage().reallocate();
+ }
}
finally
{
@@ -3392,7 +3488,7 @@ public abstract class AbstractQueue<X extends
AbstractQueue<X>>
{
MessageConverter messageConverter =
MessageConverterRegistry.getConverter(message.getClass(),
InternalMessage.class);
- if (messageConverter != null)
+ if (messageConverter != null && message.checkValid())
{
InternalMessage convertedMessage = null;
try
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 6115408..36fcf9e 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -102,7 +102,7 @@ public class FlowToDiskOverflowPolicyHandler implements
OverflowPolicyHandler
if (cumulativeDepthBytes > maximumQueueDepthBytes
|| cumulativeDepthMessages >
maximumQueueDepthMessages)
{
- flowToDisk(message);
+ flowToDisk(node);
}
}
}
@@ -120,19 +120,18 @@ public class FlowToDiskOverflowPolicyHandler implements
OverflowPolicyHandler
if ((maximumQueueDepthBytes >= 0L && queueDepthBytes >
maximumQueueDepthBytes) ||
(maximumQueueDepthMessages >= 0L && queueDepthMessages >
maximumQueueDepthMessages))
{
- ServerMessage message = newlyEnqueued.getMessage();
- if (message != null)
- {
- flowToDisk(message);
- }
+ flowToDisk(newlyEnqueued);
}
}
- private void flowToDisk(final ServerMessage message)
+ private void flowToDisk(final QueueEntry node)
{
- try (MessageReference messageReference = message.newReference())
+ try (MessageReference messageReference =
node.getMessage().newReference())
{
- message.getStoredMessage().flowToDisk();
+ if (node.getQueue().checkValid(node))
+ {
+
messageReference.getMessage().getStoredMessage().flowToDisk();
+ }
}
catch (MessageDeletedException mde)
{
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 64818d9..431bd7c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -595,7 +595,7 @@ public abstract class QueueEntryImpl implements QueueEntry
RoutingResult<?> result;
ServerMessage<?> message = getMessage();
- if (alternateBindingDestination != null)
+ if (alternateBindingDestination != null && message.checkValid())
{
result = alternateBindingDestination.route(message,
message.getInitialRoutingAddress(),
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index 49f7d83..ea0cc5b 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -54,6 +54,8 @@ final class QueueStatistics
private final AtomicInteger _expiredCount = new AtomicInteger();
private final AtomicLong _expiredSize = new AtomicLong();
+ private final AtomicInteger _malformedCount = new AtomicInteger();
+ private final AtomicLong _malformedSize = new AtomicLong();
public final int getQueueCount()
{
@@ -155,6 +157,16 @@ final class QueueStatistics
return _expiredSize.get();
}
+ public int getMalformedCount()
+ {
+ return _malformedCount.get();
+ }
+
+ public long getMalformedSize()
+ {
+ return _malformedSize.get();
+ }
+
void addToQueue(long size)
{
int count = _queueCount.incrementAndGet();
@@ -241,4 +253,9 @@ final class QueueStatistics
_expiredSize.addAndGet(size);
}
+ void addToMalformed(final long size)
+ {
+ _malformedCount.incrementAndGet();
+ _malformedSize.addAndGet(size);
+ }
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 9b65d42..b8d2bbd 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -54,13 +54,13 @@ public class SortedQueueImpl extends
OutOfOrderQueue<SortedQueueImpl> implements
}
@Override
- protected void doEnqueue(final ServerMessage message,
+ protected QueueEntry doEnqueue(final ServerMessage message,
final Action<? super MessageInstance> action,
MessageEnqueueRecord record)
{
synchronized (_sortedQueueLock)
{
- super.doEnqueue(message, action, record);
+ return super.doEnqueue(message, action, record);
}
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index ec87654..f5bfccc 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
@PluggableService
public class MessageStoreSerializer_v1 implements MessageStoreSerializer
@@ -324,8 +325,19 @@ public class MessageStoreSerializer_v1 implements
MessageStoreSerializer
handle.addContent(buf);
}
final StoredMessage<StorableMessageMetaData> storedMessage =
handle.allContentAdded();
- messageNumberMap.put(originalMessageNumber, storedMessage);
- storedMessage.flowToDisk();
+ try
+ {
+ storedMessage.flowToDisk();
+ messageNumberMap.put(originalMessageNumber, storedMessage);
+ }
+ catch (RuntimeException e)
+ {
+ if (e instanceof ServerScopedRuntimeException)
+ {
+ throw e;
+ }
+ throw new IllegalArgumentException("Could not decode message
metadata", e);
+ }
record = deserializer.readRecord();
}
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index c00da42..ae344de 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -59,31 +59,26 @@ public class FlowToDiskTransactionObserver implements
TransactionObserver
{
StoredMessage<? extends StorableMessageMetaData> handle =
message.getStoredMessage();
long messageSize = handle.getContentSize() + handle.getMetadataSize();
-
- long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+ long newUncommittedSize =
_uncommittedMessageSize.addAndGet(messageSize);
+ TransactionDetails details =
_uncommittedMessages.computeIfAbsent(transaction, key -> new
TransactionDetails());
+ details.messageEnqueued(handle);
if (newUncommittedSize > _maxUncommittedInMemorySize)
{
- handle.flowToDisk();
- if (!_reported)
+ // flow to disk only current transaction messages
+ // in order to handle malformed messages on correct channel
+ try
{
- _eventLogger.message(_logSubject,
ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize,
_maxUncommittedInMemorySize));
- _reported = true;
+ details.flowToDisk();
}
-
- if (!_uncommittedMessages.isEmpty())
+ finally
{
- for (TransactionDetails transactionDetails :
_uncommittedMessages.values())
+ if (!_reported)
{
- transactionDetails.flowToDisk();
+ _eventLogger.message(_logSubject,
ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize,
_maxUncommittedInMemorySize));
+ _reported = true;
}
}
}
- else
- {
- _uncommittedMessageSize.addAndGet(messageSize);
- TransactionDetails details =
_uncommittedMessages.computeIfAbsent(transaction, key -> new
TransactionDetails());
- details.messageEnqueued(handle);
- }
}
@Override
diff --git
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index de2b11e..c3b9ab3 100644
---
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2142,7 +2142,10 @@ public abstract class AbstractVirtualHost<X extends
AbstractVirtualHost<X>> exte
}
else
{
- storedMessage.flowToDisk();
+ if (node.getQueue().checkValid(node))
+ {
+ storedMessage.flowToDisk();
+ }
}
}
}
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index f811e90..82b8dc8 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1406,6 +1406,7 @@ abstract class AbstractQueueTestBase extends UnitTestBase
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn(id);
when(message.getMessageHeader()).thenReturn(header);
+ when(message.checkValid()).thenReturn(true);
StoredMessage storedMessage = mock(StoredMessage.class);
when(message.getStoredMessage()).thenReturn(storedMessage);
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index e711364..8351e53 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -129,9 +129,12 @@ public class FlowToDiskOverflowPolicyHandlerTest extends
UnitTestBase
{
ServerMessage message = mock(ServerMessage.class);
when(message.getSizeIncludingHeader()).thenReturn(size);
+ when(message.checkValid()).thenReturn(true);
+
when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
StoredMessage storedMessage = mock(StoredMessage.class);
when(message.getStoredMessage()).thenReturn(storedMessage);
+ when(storedMessage.isInMemory()).thenReturn(true);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 7a42986..b8aacfe 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -377,6 +377,7 @@ public abstract class QueueEntryImplTestBase extends
UnitTestBase
final Action<? super MessageInstance> action = mock(Action.class);
when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
+ when(_queueEntry.getMessage().checkValid()).thenReturn(true);
_queueEntry.acquire();
int enqueues = _queueEntry.routeToAlternate(action, null, null);
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index 857fc9b..bc3589d 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -216,12 +216,13 @@ public class QueueMessageRecoveryTest extends UnitTestBase
}
@Override
- protected void doEnqueue(final ServerMessage message, final Action<?
super MessageInstance> action, MessageEnqueueRecord record)
+ protected QueueEntry doEnqueue(final ServerMessage message, final
Action<? super MessageInstance> action, MessageEnqueueRecord record)
{
synchronized(_messageList)
{
_messageList.add(message);
}
+ return null;
}
}
}
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index f67a1e4..538df8a 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -64,6 +64,7 @@ public class SimpleQueueEntryImplTest extends
QueueEntryImplTestBase
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)msgId);
+ when(message.checkValid()).thenReturn(true);
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 231dd35..1f7d3fd 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,6 +130,18 @@ public class TestMessageMetaDataType implements
MessageMetaDataType<TestMessageM
}
@Override
+ public boolean checkValid()
+ {
+ return true;
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus()
+ {
+ return ValidationStatus.VALID;
+ }
+
+ @Override
public long getExpiration()
{
return 0;
diff --git
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3ccc94b..57e4bf1 100644
---
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -149,6 +149,18 @@ class MockServerMessage implements ServerMessage
}
@Override
+ public boolean checkValid()
+ {
+ return true;
+ }
+
+ @Override
+ public ValidationStatus getValidationStatus()
+ {
+ return ValidationStatus.VALID;
+ }
+
+ @Override
public long getArrivalTime()
{
throw new UnsupportedOperationException();
diff --git
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index ce7e7f0..1315258 100755
---
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.BufferUnderflowException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
@@ -142,8 +143,9 @@ public class AMQPConnection_0_10Impl extends
AbstractAMQPConnection<AMQPConnecti
_inputHandler.received(buf);
_connection.receivedComplete();
}
- catch (IllegalArgumentException | IllegalStateException e)
+ catch (IllegalArgumentException | IllegalStateException |
BufferUnderflowException e)
{
+ LOGGER.warn("Unexpected exception", e);
throw new ConnectionScopedRuntimeException(e);
}
}
diff --git
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 7c4cc7f..c6c312c 100644
---
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
@@ -203,6 +204,10 @@ public class ConsumerTarget_0_10 extends
AbstractConsumerTarget<ConsumerTarget_0
}
else
{
+ if (!serverMsg.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot
convert malformed message '%s'", serverMsg));
+ }
converter = (MessageConverter<? super ServerMessage,
MessageTransferMessage>)
MessageConverterRegistry.getConverter(serverMsg.getClass(),
MessageTransferMessage.class);
msg = converter.convert(serverMsg, _session.getAddressSpace());
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 7c663ff..62c3b53 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -120,4 +120,10 @@ public class AMQMessage extends
AbstractServerMessageImpl<AMQMessage, MessageMet
{
return AMQP_0_9_1;
}
+
+ @Override
+ protected void validate()
+ {
+ getMessageMetaData().validate();
+ }
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index 3b18feb..a4e5e68 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.nio.BufferUnderflowException;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.PrivilegedAction;
@@ -67,7 +68,6 @@ import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.transport.ByteBufferSender;
@@ -78,7 +78,6 @@ import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -251,9 +250,10 @@ public class AMQPConnection_0_8Impl
_decoder.decodeBuffer(msg);
receivedCompleteAllChannels();
}
- catch (AMQFrameDecodingException | IOException e)
+ catch (AMQFrameDecodingException | IOException |
AMQPInvalidClassException
+ | IllegalArgumentException | IllegalStateException |
BufferUnderflowException e)
{
- LOGGER.error("Unexpected exception", e);
+ LOGGER.warn("Unexpected exception", e);
throw new ConnectionScopedRuntimeException(e);
}
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index b4f06da..092de1c 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.StateChangeListener;
@@ -402,6 +403,10 @@ public abstract class ConsumerTarget_0_8 extends
AbstractConsumerTarget<Consumer
}
else
{
+ if (!serverMessage.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot
convert malformed message '%s'", serverMessage));
+ }
messageConverter =
MessageConverterRegistry.getConverter((Class<ServerMessage<?>>)
serverMessage.getClass(), AMQMessage.class);
msg = messageConverter.convert(serverMessage,
getConnection().getAddressSpace());
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 7f5080b..88e879c 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import java.nio.BufferUnderflowException;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
@@ -153,6 +154,11 @@ public class MessageMetaData implements
StorableMessageMetaData
_contentHeaderBody.reallocate();
}
+ public synchronized void validate()
+ {
+ _contentHeaderBody.getProperties().validate();
+ }
+
private static class MetaDataFactory implements
MessageMetaDataType.Factory<MessageMetaData>
{
@@ -177,7 +183,8 @@ public class MessageMetaData implements
StorableMessageMetaData
return new MessageMetaData(publishBody, chb, arrivalTime);
}
- catch (AMQFrameDecodingException | AMQProtocolVersionException e)
+ catch (AMQFrameDecodingException | AMQProtocolVersionException |
AMQPInvalidClassException
+ | IllegalArgumentException | IllegalStateException |
BufferUnderflowException e)
{
throw new ConnectionScopedRuntimeException(e);
}
diff --git
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 1ae3d68..ac2a456 100644
---
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -833,11 +833,15 @@ public class BasicContentHeaderProperties
synchronized void reallocate()
{
- _headers.clearEncodedForm();
if (_encodedForm != null)
{
_encodedForm = QpidByteBuffer.reallocateIfNecessary(_encodedForm);
}
+ _headers.clearEncodedForm();
}
+ public synchronized void validate()
+ {
+ _headers.validate();
+ }
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 12e9f12..bc87141 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -136,6 +137,10 @@ class ConsumerTarget_1_0 extends
AbstractConsumerTarget<ConsumerTarget_1_0>
}
else
{
+ if (!serverMessage.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot
convert malformed message '%s'", serverMessage));
+ }
converter =
(MessageConverter<? super ServerMessage, Message_1_0>)
MessageConverterRegistry.getConverter(serverMessage.getClass(),
Message_1_0.class);
if (converter == null)
diff --git
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index e92aa73..cc0e3c0 100644
---
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -234,7 +234,7 @@ public abstract class AbstractJDBCMessageStore implements
MessageStore
{
for (StoredJDBCMessage<?> message : _messages)
{
- message.clear();
+ message.clear(true);
}
_messages.clear();
_inMemorySize.set(0);
@@ -1349,27 +1349,43 @@ public abstract class AbstractJDBCMessageStore
implements MessageStore
public void reallocate()
{
- if(_metaData != null)
+ if (_metaData != null)
{
_metaData.reallocate();
}
_data = QpidByteBuffer.reallocateIfNecessary(_data);
}
- public long clear()
+ public long clear(boolean close)
{
long bytesCleared = 0;
- if(_metaData != null)
+ if(_data != null)
{
- bytesCleared += _metaData.getStorableSize();
- _metaData.clearEncodedForm();
- _metaData = null;
+ if(_data != null)
+ {
+ bytesCleared += _data.remaining();
+ _data.dispose();
+ _data = null;
+ }
}
- if(_data != null)
+ if (_metaData != null)
{
- bytesCleared += _data.remaining();
- _data.dispose();
- _data = null;
+ bytesCleared += _metaData.getStorableSize();
+ try
+ {
+ if (close)
+ {
+ _metaData.dispose();
+ }
+ else
+ {
+ _metaData.clearEncodedForm();
+ }
+ }
+ finally
+ {
+ _metaData = null;
+ }
}
return bytesCleared;
}
@@ -1609,7 +1625,7 @@ public abstract class AbstractJDBCMessageStore implements
MessageStore
flushToStore();
if(_messageDataRef != null && !_messageDataRef.isHardRef())
{
- final long bytesCleared = _messageDataRef.clear();
+ final long bytesCleared = _messageDataRef.clear(false);
_inMemorySize.addAndGet(-bytesCleared);
_bytesEvacuatedFromMemory.addAndGet(bytesCleared);
}
@@ -1625,11 +1641,11 @@ public abstract class AbstractJDBCMessageStore
implements MessageStore
}
}
- public synchronized void clear()
+ public synchronized void clear(boolean close)
{
if (_messageDataRef != null)
{
- _messageDataRef.clear();
+ _messageDataRef.clear(close);
}
}
diff --git
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 3b57e73..c878638 100644
---
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -85,6 +85,7 @@ import org.apache.qpid.server.model.OperationParameter;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AMQPSession;
@@ -392,6 +393,10 @@ class ManagementNode implements MessageSource,
MessageDestination, BaseQueue
final Action<? super MessageInstance> action,
final MessageEnqueueRecord record)
{
+ if (!message.checkValid())
+ {
+ throw new MessageConversionException(String.format("Cannot convert
malformed message '%s'", message));
+ }
@SuppressWarnings("unchecked")
MessageConverter<ServerMessage, InternalMessage> converter =
(MessageConverter<ServerMessage, InternalMessage>)
MessageConverterRegistry.getConverter((message.getClass()),
InternalMessage.class);
diff --git
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index 8804c31..fd05901 100644
---
a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++
b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -82,6 +82,13 @@ public class Interaction extends
AbstractInteraction<Interaction>
return this;
}
+ public <T extends Method> Interaction sendPerformativeWithoutCopying(final
T performative) throws Exception
+ {
+ performative.setChannel(_channelId);
+ sendPerformativeAndChainFuture(performative);
+ return this;
+ }
+
public ConnectionInteraction connection()
{
return _connectionInteraction;
diff --git
a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
new file mode 100644
index 0000000..19592de
--- /dev/null
+++
b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/extensions/message/MalformedMessage.java
@@ -0,0 +1,343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10.extensions.message;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.Decoder;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Encoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.MethodDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.Option;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_10.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_10.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final String CONTENT_TEXT = "Test";
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress =
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void malformedMessage() throws Exception
+ {
+ try (FrameTransport transport = new
FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(UTF_8);
+
+ byte[] contentBytes =
CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+ deliveryProps.setRoutingKey(BrokerAdmin.TEST_QUEUE_NAME);
+ deliveryProps.setTimestamp(System.currentTimeMillis());
+ messageProps.setContentLength(contentBytes.length);
+ messageProps.setContentType("plain/text");
+ messageProps.setMessageId(UUID.randomUUID());
+
+ final Header header = new Header(deliveryProps, messageProps,
null);
+
+ final TestMessageTransfer malformedTransfer = new
TestMessageTransfer(BrokerAdmin.TEST_QUEUE_NAME,
+
MessageAcceptMode.EXPLICIT,
+
MessageAcquireMode.PRE_ACQUIRED,
+
header,
+
QpidByteBuffer.wrap(contentBytes))
+ {
+ @Override
+ public void write(final Encoder enc)
+ {
+ // write flags without writing anything else
+ enc.writeUint16(packingFlags);
+ }
+ };
+
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .sendPerformativeWithoutCopying(malformedTransfer)
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(ChannelClosedResponse.class);
+ }
+ }
+
+ private class TestMessageTransfer extends Method
+ {
+ short packingFlags;
+ private String destination;
+ private MessageAcceptMode acceptMode;
+ private MessageAcquireMode acquireMode;
+ private Header header;
+ private QpidByteBuffer _body;
+ private int _bodySize;
+
+
+ TestMessageTransfer(String destination,
+ MessageAcceptMode acceptMode,
+ MessageAcquireMode acquireMode,
+ Header header,
+ QpidByteBuffer body,
+ Option... options)
+ {
+ if (destination != null)
+ {
+ setDestination(destination);
+ }
+ if (acceptMode != null)
+ {
+ setAcceptMode(acceptMode);
+ }
+ if (acquireMode != null)
+ {
+ setAcquireMode(acquireMode);
+ }
+ setHeader(header);
+ setBody(body);
+
+ for (final Option option : options)
+ {
+ switch (option)
+ {
+ case SYNC:
+ this.setSync(true);
+ break;
+ case NONE:
+ break;
+ default:
+ throw new IllegalArgumentException("invalid option: "
+ option);
+ }
+ }
+ }
+
+ @Override
+ public final int getStructType()
+ {
+ return 1025;
+ }
+
+ @Override
+ public final int getSizeWidth()
+ {
+ return 0;
+ }
+
+ @Override
+ public final int getPackWidth()
+ {
+ return 2;
+ }
+
+ @Override
+ public final boolean hasPayload()
+ {
+ return true;
+ }
+
+ @Override
+ public final byte getEncodedTrack()
+ {
+ return Frame.L4;
+ }
+
+ @Override
+ public final boolean isConnectionControl()
+ {
+ return false;
+ }
+
+ @Override
+ public <C> void dispatch(C context, MethodDelegate<C> delegate)
+ {
+ delegate.handle(context, this);
+ }
+
+ @Override
+ public final Header getHeader()
+ {
+ return this.header;
+ }
+
+ @Override
+ public final void setHeader(Header header)
+ {
+ this.header = header;
+ }
+
+ @Override
+ public final QpidByteBuffer getBody()
+ {
+ return _body;
+ }
+
+ @Override
+ public final void setBody(QpidByteBuffer body)
+ {
+ if (body == null)
+ {
+ _bodySize = 0;
+ if (_body != null)
+ {
+ _body.dispose();
+ }
+ _body = null;
+ }
+ else
+ {
+ _body = body.duplicate();
+ _bodySize = _body.remaining();
+ }
+ }
+
+ @Override
+ public int getBodySize()
+ {
+ return _bodySize;
+ }
+
+ @Override
+ public void write(Encoder enc)
+ {
+ enc.writeUint16(packingFlags);
+ if ((packingFlags & 256) != 0)
+ {
+ enc.writeStr8(this.destination);
+ }
+ if ((packingFlags & 512) != 0)
+ {
+ enc.writeUint8(this.acceptMode.getValue());
+ }
+ if ((packingFlags & 1024) != 0)
+ {
+ enc.writeUint8(this.acquireMode.getValue());
+ }
+ }
+
+ @Override
+ public void read(Decoder dec)
+ {
+ packingFlags = (short) dec.readUint16();
+ if ((packingFlags & 256) != 0)
+ {
+ this.destination = dec.readStr8();
+ }
+ if ((packingFlags & 512) != 0)
+ {
+ this.acceptMode = MessageAcceptMode.get(dec.readUint8());
+ }
+ if ((packingFlags & 1024) != 0)
+ {
+ this.acquireMode = MessageAcquireMode.get(dec.readUint8());
+ }
+ }
+
+ @Override
+ public Map<String, Object> getFields()
+ {
+ Map<String, Object> result = new LinkedHashMap<>();
+
+ if ((packingFlags & 256) != 0)
+ {
+ result.put("destination", getDestination());
+ }
+ if ((packingFlags & 512) != 0)
+ {
+ result.put("acceptMode", getAcceptMode());
+ }
+ if ((packingFlags & 1024) != 0)
+ {
+ result.put("acquireMode", getAcquireMode());
+ }
+ return result;
+ }
+
+ final String getDestination()
+ {
+ return destination;
+ }
+
+ final void setDestination(String value)
+ {
+ this.destination = value;
+ packingFlags |= 256;
+ setDirty(true);
+ }
+
+ final MessageAcceptMode getAcceptMode()
+ {
+ return acceptMode;
+ }
+
+ final void setAcceptMode(MessageAcceptMode value)
+ {
+ this.acceptMode = value;
+ packingFlags |= 512;
+ setDirty(true);
+ }
+
+ final MessageAcquireMode getAcquireMode()
+ {
+ return acquireMode;
+ }
+
+ final void setAcquireMode(MessageAcquireMode value)
+ {
+ this.acquireMode = value;
+ packingFlags |= 1024;
+ setDirty(true);
+ }
+ }
+
+}
diff --git
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 718c41d..1ac239a 100644
---
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -51,7 +51,7 @@ public class BasicInteraction
private boolean _publishMandatory;
private boolean _publishImmediate;
private byte[] _content;
- private Map<String, Object> _contentHeaderPropertiesHeaders = new
HashMap<>();
+ private FieldTable _contentHeaderPropertiesHeaders;
private String _contentHeaderPropertiesContentType;
private byte _contentHeaderPropertiesDeliveryMode;
private byte _contentHeaderPropertiesPriority;
@@ -104,6 +104,12 @@ public class BasicInteraction
public BasicInteraction contentHeaderPropertiesHeaders(final Map<String,
Object> messageHeaders)
{
+ _contentHeaderPropertiesHeaders =
FieldTable.convertToFieldTable(messageHeaders);
+ return this;
+ }
+
+ public BasicInteraction contentHeaderPropertiesHeaders(final FieldTable
messageHeaders)
+ {
_contentHeaderPropertiesHeaders = messageHeaders;
return this;
}
@@ -129,7 +135,7 @@ public class BasicInteraction
public Interaction contentHeader(int contentSize) throws Exception
{
final BasicContentHeaderProperties basicContentHeaderProperties = new
BasicContentHeaderProperties();
-
basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+
basicContentHeaderProperties.setHeaders(_contentHeaderPropertiesHeaders);
basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
@@ -156,7 +162,7 @@ public class BasicInteraction
_publishImmediate);
frames.add(new AMQFrame(_interaction.getChannelId(), publishFrame));
final BasicContentHeaderProperties basicContentHeaderProperties = new
BasicContentHeaderProperties();
-
basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+
basicContentHeaderProperties.setHeaders(_contentHeaderPropertiesHeaders);
basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
diff --git
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index bb38fa0..0a4dbff 100644
---
a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++
b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
@@ -27,7 +31,9 @@ import
org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
import org.apache.qpid.tests.protocol.AbstractInteraction;
+import org.apache.qpid.tests.protocol.Response;
public class Interaction extends AbstractInteraction<Interaction>
{
@@ -143,4 +149,35 @@ public class Interaction extends
AbstractInteraction<Interaction>
{
return _exchangeInteraction;
}
+
+ @SafeVarargs
+ public final <T extends AMQBody> T consume(final Class<T> expected,
+ final Class<? extends
AMQBody>... ignore)
+ throws Exception
+ {
+ final Class<? extends AMQBody>[] expectedResponses =
Arrays.copyOf(ignore, ignore.length + 1);
+ expectedResponses[ignore.length] = expected;
+
+ T completed = null;
+ do
+ {
+ Response<?> response =
consumeResponse(expectedResponses).getLatestResponse();
+ if (expected.isAssignableFrom(response.getBody().getClass()))
+ {
+ completed = (T) response.getBody();
+ }
+ }
+ while (completed == null);
+ return completed;
+ }
+
+ public String getLatestResponseContentBodyAsString() throws Exception
+ {
+ ContentBody content = getLatestResponse(ContentBody.class);
+ QpidByteBuffer payload = content.getPayload();
+ byte[] contentData = new byte[payload.remaining()];
+ payload.get(contentData);
+ payload.dispose();
+ return new String(contentData, StandardCharsets.UTF_8);
+ }
}
diff --git
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index f1300fb..f497f05 100644
---
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -285,9 +285,9 @@ public class BasicTest extends BrokerAdminUsingTestBase
assertThat(properties.getPriority(), is(equalTo(priority)));
assertThat(properties.getDeliveryMode(),
is(equalTo(deliveryMode)));
- ContentBody content =
interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
+ interaction.consumeResponse(ContentBody.class);
- String receivedContent = getContent(content);
+ String receivedContent =
interaction.getLatestResponseContentBodyAsString();
assertThat(receivedContent, is(equalTo(messageContent)));
assertThat(getBrokerAdmin().getQueueDepthMessages(queueName),
is(equalTo(1)));
diff --git
a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
new file mode 100644
index 0000000..ccea026
--- /dev/null
+++
b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/basic/MalformedMessage.java
@@ -0,0 +1,294 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.basic;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.AMQType;
+import org.apache.qpid.server.protocol.v0_8.EncodingUtils;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.FieldTableFactory;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import
org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final String CONTENT_TEXT = "Test";
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress =
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void malformedHeaderValue() throws Exception
+ {
+ final FieldTable malformedHeader =
createHeadersWithMalformedLongString();
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+ publishMalformedMessage(malformedHeader, contentBytes);
+
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(0)));
+ }
+
+ @Test
+ public void malformedHeader() throws Exception
+ {
+ final FieldTable malformedHeader = createMalformedHeaders();
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+ publishMalformedMessage(malformedHeader, contentBytes);
+
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(0)));
+ }
+
+ @Test
+ public void publishMalformedMessageToQueueBoundWithSelector() throws
Exception
+ {
+ final FieldTable malformedHeader =
createMalformedHeadersWithMissingValue("prop");
+ final BasicContentHeaderProperties basicContentHeaderProperties = new
BasicContentHeaderProperties();
+ basicContentHeaderProperties.setHeaders(malformedHeader);
+ basicContentHeaderProperties.setContentType("text/plain");
+
basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ try(FrameTransport transport = new
FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+
.channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue()
+ .bindName(ExchangeDefaults.TOPIC_EXCHANGE_NAME)
+ .bindRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .bindQueueName(BrokerAdmin.TEST_QUEUE_NAME)
+
.bindArguments(Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.getValue(),
"prop = 1"))
+ .bind()
+ .consumeResponse(QueueBindOkBody.class)
+
.basic().publishExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME)
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+ .consumeResponse()
+ .getLatestResponse(ChannelClosedResponse.class);
+
+
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(0)));
+ }
+ }
+
+ @Test
+ public void
publishMalformedMessageInTransactionExceedingMaxUncommittedLimit() throws
Exception
+ {
+ final FieldTable malformedHeader =
createMalformedHeadersWithMissingValue("prop");
+ final BasicContentHeaderProperties basicContentHeaderProperties = new
BasicContentHeaderProperties();
+ basicContentHeaderProperties.setHeaders(malformedHeader);
+ basicContentHeaderProperties.setContentType("text/plain");
+
basicContentHeaderProperties.setDeliveryMode(BasicContentHeaderProperties.PERSISTENT);
+ byte[] contentBytes = CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ try(FrameTransport transport = new
FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+
.channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .tx().select()
+ .consumeResponse(TxSelectOkBody.class)
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+ .tx().commit()
+ .consumeResponse()
+ .getLatestResponse(ChannelClosedResponse.class);
+
+
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
is(equalTo(0)));
+ }
+ }
+
+ @Test
+ public void consumeMalformedMessage() throws Exception
+ {
+ final FieldTable malformedHeader =
createHeadersWithMalformedLongString();
+ final byte[] contentBytes =
CONTENT_TEXT.getBytes(StandardCharsets.UTF_8);
+
+ final String content2 = "message2";
+ final byte[] content2Bytes = content2.getBytes(StandardCharsets.UTF_8);
+
+ try (FrameTransport transport = new
FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ String consumerTag = "A";
+ interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().qosPrefetchCount(1)
+ .qos()
+ .consumeResponse(BasicQosOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .consume()
+ .consumeResponse(BasicConsumeOkBody.class)
+ .channel().flow(true)
+ .consumeResponse(ChannelFlowOkBody.class)
+
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesContentType("text/plain")
+ .contentHeaderPropertiesHeaders(Collections.emptyMap())
+ .content(content2Bytes)
+ .publishMessage();
+
+ BasicDeliverBody delivery =
interaction.consumeResponse(BasicDeliverBody.class)
+
.getLatestResponse(BasicDeliverBody.class);
+ assertThat(delivery.getConsumerTag(),
is(equalTo(AMQShortString.valueOf(consumerTag))));
+ assertThat(delivery.getConsumerTag(), is(notNullValue()));
+ assertThat(delivery.getRedelivered(), is(equalTo(false)));
+ assertThat(delivery.getExchange(), is(nullValue()));
+ assertThat(delivery.getRoutingKey(),
is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+
+ ContentHeaderBody header =
+
interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class);
+
+ assertThat(header.getBodySize(), is(equalTo((long)
content2Bytes.length)));
+ BasicContentHeaderProperties properties = header.getProperties();
+ Map<String, Object> receivedHeaders = new
HashMap<>(FieldTable.convertToMap(properties.getHeaders()));
+ assertThat(receivedHeaders.isEmpty(), is(equalTo(true)));
+
+ String receivedContent =
+
interaction.consumeResponse(ContentBody.class).getLatestResponseContentBodyAsString();
+
+ assertThat(receivedContent, is(equalTo(content2)));
+
+ interaction.channel().close()
+ .consumeResponse(ChannelCloseOkBody.class,
ChannelFlowOkBody.class);
+ }
+ }
+
+ private void publishMalformedMessage(final FieldTable malformedHeader,
final byte[] contentBytes) throws Exception
+ {
+
+ try(FrameTransport transport = new
FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+
.channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .contentHeaderPropertiesHeaders(malformedHeader)
+ .content(contentBytes)
+ .publishMessage()
+ .channel().close()
+ .consumeResponse(ChannelCloseOkBody.class);
+ }
+ }
+
+ private static FieldTable createMalformedHeaders()
+ {
+
+ final QpidByteBuffer buf = QpidByteBuffer.allocate(1);
+ buf.put((byte) -1);
+
+ buf.flip();
+
+ return FieldTableFactory.createFieldTable(buf);
+ }
+
+ private FieldTable createHeadersWithMalformedLongString()
+ {
+ // korean (each character occupies 3 bytes)
+ final byte[] valueBytes = {(byte) 0xED, (byte) 0x95, (byte) 0x9C,
+ (byte) 0xEA, (byte) 0xB5, (byte) 0xAD,
+ (byte) 0xEC, (byte) 0x96, (byte) 0xB4};
+ final String value = new String(valueBytes, StandardCharsets.UTF_8);
+
+ final String key = "test";
+ final QpidByteBuffer buf =
QpidByteBuffer.allocate(EncodingUtils.encodedShortStringLength(key)
+ + Byte.BYTES +
Integer.BYTES + value.length());
+
+ // write key
+ EncodingUtils.writeShortStringBytes(buf, key);
+
+ // write value as long string with incorrectly encoded characters
+ buf.put(AMQType.LONG_STRING.identifier());
+ buf.putUnsignedInt(value.length());
+ value.chars().forEach(c -> buf.put((byte) c));
+
+ buf.flip();
+
+ return FieldTableFactory.createFieldTable(buf);
+ }
+
+ private FieldTable createMalformedHeadersWithMissingValue(String key)
+ {
+ final QpidByteBuffer buf =
QpidByteBuffer.allocate(EncodingUtils.encodedShortStringLength(key));
+
+ // write key
+ EncodingUtils.writeShortStringBytes(buf, key);
+
+ buf.flip();
+
+ return FieldTableFactory.createFieldTable(buf);
+ }
+
+}
diff --git
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
new file mode 100644
index 0000000..296edf6
--- /dev/null
+++
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.message;
+
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final String CONTENT_TEXT = "Test";
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress =
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ public void malformedMessage() throws Exception
+ {
+ try (final FrameTransport transport = new
FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach()
+ .consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ final Flow flow = interaction.getLatestResponse(Flow.class);
+ assertThat(flow.getLinkCredit().intValue(),
Matchers.is(greaterThan(1)));
+
+ final QpidByteBuffer payload = generateMalformed();
+ interaction.transferSettled(true)
+ .transferPayload(payload)
+ .transferSettled(true)
+ .transfer();
+
+ final Detach responseDetach =
interaction.consumeResponse().getLatestResponse(Detach.class);
+ assertThat(responseDetach.getClosed(), is(true));
+ assertThat(responseDetach.getError(), is(notNullValue()));
+ assertThat(responseDetach.getError().getCondition(),
is(equalTo(AmqpError.DECODE_ERROR)));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ private QpidByteBuffer generateMalformed()
+ {
+ final List<QpidByteBuffer> payload = new ArrayList<>();
+
+ final Properties properties = new Properties();
+ properties.setTo(BrokerAdmin.TEST_QUEUE_NAME);
+ PropertiesSection propertiesSection =
properties.createEncodingRetainingSection();
+ final QpidByteBuffer props = propertiesSection.getEncodedForm();
+ payload.add(props);
+ propertiesSection.dispose();
+
+ final AmqpValue amqpValue = new AmqpValue(CONTENT_TEXT);
+ final AmqpValueSection dataSection =
amqpValue.createEncodingRetainingSection();
+
+ final QpidByteBuffer encodedData = dataSection.getEncodedForm();
+ payload.add(encodedData.view(0, encodedData.remaining() - 1));
+ encodedData.dispose();
+ dataSection.dispose();
+
+ final QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+ payload.forEach(QpidByteBuffer::dispose);
+ return combined;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]