This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/7.0.x by this push:
     new 4524b04  QPID-8273: [Broker-J] Handle malformed messages
4524b04 is described below

commit 4524b0432afef223f6122349a0832932188c4e35
Author: Alex Rudyy <[email protected]>
AuthorDate: Wed Feb 13 13:48:41 2019 +0000

    QPID-8273: [Broker-J] Handle malformed messages
    
    This closes #21
    
    (cherry picked from commit adb2a34306d67559ee81db155826dc67a02cc85e)
---
 .../store/berkeleydb/AbstractBDBMessageStore.java  |  42 +++++--
 .../server/logging/messages/QueueMessages.java     |  62 ++++++++++
 .../logging/messages/Queue_logmessages.properties  |   2 +
 .../server/message/AbstractServerMessageImpl.java  |  51 +++++++-
 .../apache/qpid/server/message/ServerMessage.java  |  11 ++
 .../java/org/apache/qpid/server/model/Queue.java   |  13 ++
 .../qpid/server/protocol/v0_8/FieldTable.java      |  43 +++++--
 .../apache/qpid/server/queue/AbstractQueue.java    | 124 ++++++++++++++++---
 .../queue/FlowToDiskOverflowPolicyHandler.java     |  17 ++-
 .../apache/qpid/server/queue/QueueEntryImpl.java   |   2 +-
 .../apache/qpid/server/queue/QueueStatistics.java  |  17 +++
 .../apache/qpid/server/queue/SortedQueueImpl.java  |   4 +-
 .../serializer/v1/MessageStoreSerializer_v1.java   |  16 ++-
 .../server/txn/FlowToDiskTransactionObserver.java  |  27 ++---
 .../server/virtualhost/AbstractVirtualHost.java    |   5 +-
 .../qpid/server/queue/AbstractQueueTestBase.java   |   1 +
 .../queue/FlowToDiskOverflowPolicyHandlerTest.java |   3 +
 .../qpid/server/queue/QueueEntryImplTestBase.java  |   1 +
 .../server/queue/QueueMessageRecoveryTest.java     |   3 +-
 .../server/queue/SimpleQueueEntryImplTest.java     |   1 +
 .../qpid/server/store/TestMessageMetaDataType.java |  12 ++
 .../apache/qpid/server/txn/MockServerMessage.java  |  12 ++
 .../protocol/v0_10/AMQPConnection_0_10Impl.java    |   3 +-
 .../server/protocol/v0_10/ConsumerTarget_0_10.java |   5 +
 .../qpid/server/protocol/v0_8/AMQMessage.java      |   6 +
 .../protocol/v0_8/AMQPConnection_0_8Impl.java      |   4 +-
 .../server/protocol/v0_8/ConsumerTarget_0_8.java   |   5 +
 .../qpid/server/protocol/v0_8/MessageMetaData.java |   9 +-
 .../transport/BasicContentHeaderProperties.java    |   6 +
 .../server/protocol/v1_0/ConsumerTarget_1_0.java   |   5 +
 .../store/jdbc/AbstractJDBCMessageStore.java       |  44 ++++---
 .../server/management/amqp/ManagementNode.java     |   5 +
 .../extensions/qpid/message/MalformedMessage.java  | 133 +++++++++++++++++++++
 33 files changed, 607 insertions(+), 87 deletions(-)

diff --git 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 10906ce..ff9e204 100644
--- 
a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ 
b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -197,7 +197,7 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
     {
         for (StoredBDBMessage<?> message : _messages)
         {
-            message.clear();
+            message.clear(true);
         }
         _messages.clear();
         _inMemorySize.set(0);
@@ -907,20 +907,36 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
 
-        public long clear()
+        public long clear(boolean close)
         {
             long bytesCleared = 0;
-            if(_metaData != null)
+            if(_data != null)
             {
-                bytesCleared += _metaData.getStorableSize();
-                _metaData.clearEncodedForm();
-                _metaData = null;
+                if(_data != null)
+                {
+                    bytesCleared += _data.remaining();
+                    _data.dispose();
+                    _data = null;
+                }
             }
-            if(_data != null)
+            if (_metaData != null)
             {
-                bytesCleared += _data.remaining();
-                _data.dispose();
-                _data = null;
+                bytesCleared += _metaData.getStorableSize();
+                try
+                {
+                    if (close)
+                    {
+                        _metaData.dispose();
+                    }
+                    else
+                    {
+                        _metaData.clearEncodedForm();
+                    }
+                }
+                finally
+                {
+                    _metaData = null;
+                }
             }
             return bytesCleared;
         }
@@ -1136,7 +1152,7 @@ public abstract class AbstractBDBMessageStore implements 
MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = _messageDataRef.clear();
+                final long bytesCleared = _messageDataRef.clear(false);
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1158,11 +1174,11 @@ public abstract class AbstractBDBMessageStore 
implements MessageStore
             }
         }
 
-        public synchronized void clear()
+        public synchronized void clear(boolean close)
         {
             if (_messageDataRef != null)
             {
-                _messageDataRef.clear();
+                _messageDataRef.clear(close);
             }
         }
     }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
 
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
index 6655918..ee10dc3 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
@@ -66,6 +66,7 @@ public class QueueMessages
     public static final String CREATED_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
     public static final String DELETED_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.deleted";
     public static final String DROPPED_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
+    public static final String MALFORMED_MESSAGE_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.malformed_message";
     public static final String OPERATION_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
     public static final String OVERFULL_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
     public static final String UNDERFULL_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "queue.underfull";
@@ -76,6 +77,7 @@ public class QueueMessages
         LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DELETED_LOG_HIERARCHY);
         LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(MALFORMED_MESSAGE_LOG_HIERARCHY);
         LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
         LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(UNDERFULL_LOG_HIERARCHY);
@@ -328,6 +330,66 @@ public class QueueMessages
 
     /**
      * Log a Queue message of the Format:
+     * <pre>QUE-1006 : Malformed : {0} : {1}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage MALFORMED_MESSAGE(String param1, String param2)
+    {
+        String rawMessage = _messages.getString("MALFORMED_MESSAGE");
+
+        final Object[] messageArguments = {param1, param2};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, 
_currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            @Override
+            public String toString()
+            {
+                return message;
+            }
+
+            @Override
+            public String getLogHierarchy()
+            {
+                return MALFORMED_MESSAGE_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && 
toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Queue message of the Format:
      * <pre>QUE-1016 : Operation : {0}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
 
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
index cd4ecdb..bfcf701 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
@@ -26,6 +26,8 @@ DELETED = QUE-1002 : Deleted : ID: {0}
 OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : 
{1,number}, Messages : {2,number}, Message Capacity : {3,number}
 UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : 
{1,number}, Messages : {2,number}, Message Capacity : {3,number}
 DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, 
{2,number} messages, Capacity : {3,number} bytes, {4,number} messages
+MALFORMED_MESSAGE = QUE-1006 : Malformed : {0} : {1}
+
 
 # These are no longer in use
 #FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Queue : depth 
{0,number,#.##} kB, threshold {1,number,#.##} kB / Broker : direct memory used 
{2,number,#.##} MB, threshold {3,number,#.##} MB
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
 
b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 3c36dce..2f72098 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -28,6 +28,9 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
@@ -36,7 +39,7 @@ import 
org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public abstract class AbstractServerMessageImpl<X extends 
AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements 
ServerMessage<T>
 {
-
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractServerMessageImpl.class);
     private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> 
_refCountUpdater =
             
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, 
"_referenceCount");
 
@@ -49,6 +52,12 @@ public abstract class AbstractServerMessageImpl<X extends 
AbstractServerMessageI
     @SuppressWarnings("unused")
     private volatile Collection<UUID> _resources;
 
+    private volatile ServerMessage.ValidationStatus _validationStatus = 
ServerMessage.ValidationStatus.UNKNOWN;
+
+    private static final 
AtomicReferenceFieldUpdater<AbstractServerMessageImpl, 
ServerMessage.ValidationStatus>
+            _validationStatusUpdater = 
AtomicReferenceFieldUpdater.newUpdater(AbstractServerMessageImpl.class,
+                                                                              
ServerMessage.ValidationStatus.class,
+                                                                              
"_validationStatus");
 
     public AbstractServerMessageImpl(StoredMessage<T> handle, Object 
connectionReference)
     {
@@ -192,7 +201,7 @@ public abstract class AbstractServerMessageImpl<X extends 
AbstractServerMessageI
         }
         finally
         {
-            if (!wasInMemory)
+            if (!wasInMemory && checkValid())
             {
                 storedMessage.flowToDisk();
             }
@@ -211,6 +220,44 @@ public abstract class AbstractServerMessageImpl<X extends 
AbstractServerMessageI
         return "Message[" + debugIdentity() + "]";
     }
 
+    @Override
+    public ServerMessage.ValidationStatus getValidationStatus()
+    {
+        return _validationStatus;
+    }
+
+    @Override
+    public boolean checkValid()
+    {
+        ServerMessage.ValidationStatus status;
+        while ((status = _validationStatus) == 
ServerMessage.ValidationStatus.UNKNOWN)
+        {
+            ServerMessage.ValidationStatus newStatus;
+            try
+            {
+                validate();
+                newStatus = ServerMessage.ValidationStatus.VALID;
+            }
+            catch (RuntimeException e)
+            {
+                newStatus = ServerMessage.ValidationStatus.MALFORMED;
+                LOGGER.debug("Malformed message '{}' detected", this, e);
+            }
+
+            if (_validationStatusUpdater.compareAndSet(this, status, 
newStatus))
+            {
+                status = newStatus;
+                break;
+            }
+        }
+        return status == ServerMessage.ValidationStatus.VALID;
+    }
+
+    protected void validate()
+    {
+        // noop
+    }
+
     private static class Reference<X extends AbstractServerMessageImpl<X,T>, T 
extends StorableMessageMetaData>
             implements MessageReference<X>
     {
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java 
b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index 37df060..8523668 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -60,4 +60,15 @@ public interface ServerMessage<T extends 
StorableMessageMetaData> extends Enqueu
     Object getConnectionReference();
 
     boolean isResourceAcceptable(TransactionLogResource resource);
+
+    boolean checkValid();
+
+    ValidationStatus getValidationStatus();
+
+    enum ValidationStatus
+    {
+        UNKNOWN,
+        VALID,
+        MALFORMED
+    }
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index dcf4ab4..1161a95 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -462,6 +462,16 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
                       description = "Current age of oldest message on the 
queue.")
     long getOldestMessageAge();
 
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = 
StatisticUnit.BYTES, label = "Malformed",
+            description = "Total size of enqueued malformed messages.")
+    long getTotalMalformedBytes();
+
+    @SuppressWarnings("unused")
+    @ManagedStatistic(statisticType = StatisticType.CUMULATIVE, units = 
StatisticUnit.MESSAGES, label = "Malformed",
+            description = "Total number of enqueued malformed messages.")
+    long getTotalMalformedMessages();
+
     @ManagedOperation(description = "move messages from this queue to 
another", changesConfiguredObjectState = false)
     List<Long> moveMessages(@Param(name = "destination", description = "The 
queue to which the messages should be moved", mandatory = true) Queue<?> 
destination,
                             @Param(name = "messageIds", description = "If 
provided, only messages in the queue whose (internal) message-id is supplied 
will be considered for moving") List<Long> messageIds,
@@ -555,4 +565,7 @@ public interface Queue<X extends Queue<X>> extends 
ConfiguredObject<X>,
     QueueEntry getLeastSignificantOldestEntry();
 
     QueueEntryIterator queueEntryIterator();
+
+    boolean checkValid(QueueEntry queueEntry);
+
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
 
b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
index 4f0e6df..9608fc4 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/protocol/v0_8/FieldTable.java
@@ -149,6 +149,15 @@ public class FieldTable
             {
                 _encodedForm.reset();
             }
+
+            final long recalculateEncodedSize = calculateEncodedSize();
+            if (_encodedSize != recalculateEncodedSize)
+            {
+                throw new IllegalStateException(String.format(
+                        "Malformed field table detected: provided encoded size 
'%d' does not equal calculated size '%d'",
+                        _encodedSize,
+                        recalculateEncodedSize));
+            }
         }
     }
 
@@ -921,6 +930,13 @@ public class FieldTable
     private void recalculateEncodedSize()
     {
 
+        int encodedSize = calculateEncodedSize();
+
+        _encodedSize = encodedSize;
+    }
+
+    private int calculateEncodedSize()
+    {
         int encodedSize = 0;
         if (_properties != null)
         {
@@ -932,8 +948,7 @@ public class FieldTable
 
             }
         }
-
-        _encodedSize = encodedSize;
+        return encodedSize;
     }
 
     public synchronized void addAll(FieldTable fieldTable)
@@ -988,18 +1003,23 @@ public class FieldTable
     {
         synchronized (this)
         {
-            if (_properties == null)
+            try
             {
-                if (_encodedForm != null)
+                if (_properties == null)
                 {
-                    populateFromBuffer();
+                    if (_encodedForm != null)
+                    {
+                        populateFromBuffer();
+                    }
                 }
             }
-
-            if (_encodedForm != null)
+            finally
             {
-                _encodedForm.dispose();
-                _encodedForm = null;
+                if (_encodedForm != null)
+                {
+                    _encodedForm.dispose();
+                    _encodedForm = null;
+                }
             }
         }
     }
@@ -1252,4 +1272,9 @@ public class FieldTable
     }
 
 
+    public synchronized void validate()
+    {
+       clearEncodedForm();
+    }
+
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 84007cf..0275f6c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1170,7 +1170,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
     @Override
     public final void enqueue(ServerMessage message, Action<? super 
MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
-
+        final QueueEntry entry;
         if(_recovering.get() != RECOVERED)
         {
             _enqueuingWhileRecovering.incrementAndGet();
@@ -1194,12 +1194,16 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
                 {
                     Thread.yield();
                 }
-                doEnqueue(message, action, enqueueRecord);
+                entry = doEnqueue(message, action, enqueueRecord);
+            }
+            else
+            {
+                entry = null;
             }
         }
         else
         {
-            doEnqueue(message, action, enqueueRecord);
+            entry = doEnqueue(message, action, enqueueRecord);
         }
 
         final StoredMessage storedMessage = message.getStoredMessage();
@@ -1207,7 +1211,21 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
              || QpidByteBuffer.getAllocatedDirectMemorySize() > 
_flowToDiskThreshold)
             && storedMessage.isInMemory())
         {
-            storedMessage.flowToDisk();
+            if (message.checkValid())
+            {
+                storedMessage.flowToDisk();
+            }
+            else
+            {
+                if (entry != null)
+                {
+                    malformedEntry(entry);
+                }
+                else
+                {
+                    LOGGER.debug("Malformed message '{}' enqueued into '{}'", 
message, getName());
+                }
+            }
         }
     }
 
@@ -1250,7 +1268,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         }
     }
 
-    protected void doEnqueue(final ServerMessage message, final Action<? super 
MessageInstance> action, MessageEnqueueRecord enqueueRecord)
+    protected QueueEntry doEnqueue(final ServerMessage message, final Action<? 
super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
         final QueueEntry entry = getEntries().add(message, enqueueRecord);
         updateExpiration(entry);
@@ -1279,7 +1297,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
             }
             _postEnqueueOverflowPolicyHandler.checkOverflow(entry);
         }
-
+        return entry;
     }
 
     private void updateExpiration(final QueueEntry entry)
@@ -1658,13 +1676,15 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
         {
             QueueEntry node = queueListIterator.getNode();
             MessageReference reference = node.newMessageReference();
-            if(reference != null)
+            if(reference != null && !node.isDeleted())
             {
                 try
                 {
-
-                    final boolean done = !node.isDeleted() && 
visitor.visit(node);
-                    if(done)
+                    if (!reference.getMessage().checkValid())
+                    {
+                        malformedEntry(node);
+                    }
+                    else if (visitor.visit(node))
                     {
                         break;
                     }
@@ -2176,9 +2196,16 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
                     {
                         try (MessageReference messageReference = 
msg.newReference())
                         {
-                            for(NotificationCheck check : perMessageChecks)
+                            if (!msg.checkValid())
+                            {
+                                malformedEntry(node);
+                            }
+                            else
                             {
-                                checkForNotification(msg, listener, 
currentTime, thresholdTime, check);
+                                for (NotificationCheck check : 
perMessageChecks)
+                                {
+                                    checkForNotification(msg, listener, 
currentTime, thresholdTime, check);
+                                }
                             }
                         }
                         catch(MessageDeletedException e)
@@ -2197,6 +2224,68 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
 
     }
 
+    private void malformedEntry(final QueueEntry node)
+    {
+        deleteEntry(node, () -> {
+            _queueStatistics.addToMalformed(node.getSizeWithHeader());
+            logMalformedMessage(node);
+        });
+    }
+
+    private void logMalformedMessage(final QueueEntry node)
+    {
+        final EventLogger eventLogger = getEventLogger();
+        final ServerMessage<?> message = node.getMessage();
+        final StringBuilder messageId = new StringBuilder();
+        messageId.append(message.getMessageNumber());
+        final String id = message.getMessageHeader().getMessageId();
+        if (id != null)
+        {
+            messageId.append('/').append(id);
+        }
+        eventLogger.message(getLogSubject(), QueueMessages.MALFORMED_MESSAGE( 
messageId.toString(), "DELETE"));
+    }
+
+    @Override
+    public boolean checkValid(final QueueEntry queueEntry)
+    {
+        final ServerMessage message = queueEntry.getMessage();
+        final ServerMessage.ValidationStatus validationStatus = 
message.getValidationStatus();
+        boolean isValid = false;
+        if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+        {
+            try (MessageReference ref = message.newReference())
+            {
+                isValid = message.checkValid();
+            }
+            catch (MessageDeletedException e)
+            {
+                // noop
+            }
+        }
+        else
+        {
+            isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+        }
+        if (!isValid)
+        {
+            malformedEntry(queueEntry);
+        }
+        return isValid;
+    }
+
+    @Override
+    public long getTotalMalformedBytes()
+    {
+        return _queueStatistics.getMalformedSize();
+    }
+
+    @Override
+    public long getTotalMalformedMessages()
+    {
+        return _queueStatistics.getMalformedCount();
+    }
+
     @Override
     public void reallocateMessages()
     {
@@ -2213,7 +2302,14 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
                     final MessageReference messageReference = 
message.newReference();
                     try
                     {
-                        message.getStoredMessage().reallocate();
+                        if (!message.checkValid())
+                        {
+                            malformedEntry(node);
+                        }
+                        else
+                        {
+                            message.getStoredMessage().reallocate();
+                        }
                     }
                     finally
                     {
@@ -3262,7 +3358,7 @@ public abstract class AbstractQueue<X extends 
AbstractQueue<X>>
             {
                 MessageConverter messageConverter =
                         
MessageConverterRegistry.getConverter(message.getClass(), 
InternalMessage.class);
-                if (messageConverter != null)
+                if (messageConverter != null && message.checkValid())
                 {
                     InternalMessage convertedMessage = null;
                     try
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
 
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 6115408..36fcf9e 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -102,7 +102,7 @@ public class FlowToDiskOverflowPolicyHandler implements 
OverflowPolicyHandler
                             if (cumulativeDepthBytes > maximumQueueDepthBytes
                                 || cumulativeDepthMessages > 
maximumQueueDepthMessages)
                             {
-                                flowToDisk(message);
+                                flowToDisk(node);
                             }
                         }
                     }
@@ -120,19 +120,18 @@ public class FlowToDiskOverflowPolicyHandler implements 
OverflowPolicyHandler
             if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > 
maximumQueueDepthBytes) ||
                 (maximumQueueDepthMessages >= 0L && queueDepthMessages > 
maximumQueueDepthMessages))
             {
-                ServerMessage message = newlyEnqueued.getMessage();
-                if (message != null)
-                {
-                    flowToDisk(message);
-                }
+                flowToDisk(newlyEnqueued);
             }
         }
 
-        private void flowToDisk(final ServerMessage message)
+        private void flowToDisk(final QueueEntry node)
         {
-            try (MessageReference messageReference = message.newReference())
+            try (MessageReference messageReference = 
node.getMessage().newReference())
             {
-                message.getStoredMessage().flowToDisk();
+                if (node.getQueue().checkValid(node))
+                {
+                    
messageReference.getMessage().getStoredMessage().flowToDisk();
+                }
             }
             catch (MessageDeletedException mde)
             {
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 51cee72..dd241bd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -591,7 +591,7 @@ public abstract class QueueEntryImpl implements QueueEntry
         }
 
         RoutingResult result;
-        if (alternateBindingDestination != null)
+        if (alternateBindingDestination != null && getMessage().checkValid())
         {
             result = alternateBindingDestination.route(getMessage(), 
getMessage().getInitialRoutingAddress(),
                                                            
getInstanceProperties());
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
index 49f7d83..ea0cc5b 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
@@ -54,6 +54,8 @@ final class QueueStatistics
 
     private final AtomicInteger _expiredCount = new AtomicInteger();
     private final AtomicLong _expiredSize = new AtomicLong();
+    private final AtomicInteger _malformedCount = new AtomicInteger();
+    private final AtomicLong _malformedSize = new AtomicLong();
 
     public final int getQueueCount()
     {
@@ -155,6 +157,16 @@ final class QueueStatistics
         return _expiredSize.get();
     }
 
+    public int getMalformedCount()
+    {
+        return _malformedCount.get();
+    }
+
+    public long getMalformedSize()
+    {
+        return _malformedSize.get();
+    }
+
     void addToQueue(long size)
     {
         int count = _queueCount.incrementAndGet();
@@ -241,4 +253,9 @@ final class QueueStatistics
         _expiredSize.addAndGet(size);
     }
 
+    void addToMalformed(final long size)
+    {
+        _malformedCount.incrementAndGet();
+        _malformedSize.addAndGet(size);
+    }
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java 
b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 9b65d42..b8d2bbd 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -54,13 +54,13 @@ public class SortedQueueImpl extends 
OutOfOrderQueue<SortedQueueImpl> implements
     }
 
     @Override
-    protected void doEnqueue(final ServerMessage message,
+    protected QueueEntry doEnqueue(final ServerMessage message,
                         final Action<? super MessageInstance> action,
                         MessageEnqueueRecord record)
     {
         synchronized (_sortedQueueLock)
         {
-            super.doEnqueue(message, action, record);
+            return super.doEnqueue(message, action, record);
         }
     }
 
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
 
b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
index ec87654..f5bfccc 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageStoreSerializer_v1.java
@@ -46,6 +46,7 @@ import org.apache.qpid.server.store.handler.MessageHandler;
 import org.apache.qpid.server.store.handler.MessageInstanceHandler;
 import org.apache.qpid.server.store.serializer.MessageStoreSerializer;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 @PluggableService
 public class MessageStoreSerializer_v1 implements MessageStoreSerializer
@@ -324,8 +325,19 @@ public class MessageStoreSerializer_v1 implements 
MessageStoreSerializer
                 handle.addContent(buf);
             }
             final StoredMessage<StorableMessageMetaData> storedMessage = 
handle.allContentAdded();
-            messageNumberMap.put(originalMessageNumber, storedMessage);
-            storedMessage.flowToDisk();
+            try
+            {
+                storedMessage.flowToDisk();
+                messageNumberMap.put(originalMessageNumber, storedMessage);
+            }
+            catch (RuntimeException e)
+            {
+                if (e instanceof ServerScopedRuntimeException)
+                {
+                    throw e;
+                }
+                throw new IllegalArgumentException("Could not decode message 
metadata", e);
+            }
 
             record = deserializer.readRecord();
         }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
 
b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
index c00da42..ae344de 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/txn/FlowToDiskTransactionObserver.java
@@ -59,31 +59,26 @@ public class FlowToDiskTransactionObserver implements 
TransactionObserver
     {
         StoredMessage<? extends StorableMessageMetaData> handle = 
message.getStoredMessage();
         long messageSize = handle.getContentSize() + handle.getMetadataSize();
-
-        long newUncommittedSize = _uncommittedMessageSize.get() + messageSize;
+        long newUncommittedSize = 
_uncommittedMessageSize.addAndGet(messageSize);
+        TransactionDetails details = 
_uncommittedMessages.computeIfAbsent(transaction, key -> new 
TransactionDetails());
+        details.messageEnqueued(handle);
         if (newUncommittedSize > _maxUncommittedInMemorySize)
         {
-            handle.flowToDisk();
-            if (!_reported)
+            // flow to disk only current transaction messages
+            // in order to handle malformed messages on correct channel
+            try
             {
-                _eventLogger.message(_logSubject, 
ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, 
_maxUncommittedInMemorySize));
-                _reported = true;
+                details.flowToDisk();
             }
-
-            if (!_uncommittedMessages.isEmpty())
+            finally
             {
-                for (TransactionDetails transactionDetails : 
_uncommittedMessages.values())
+                if (!_reported)
                 {
-                    transactionDetails.flowToDisk();
+                    _eventLogger.message(_logSubject, 
ConnectionMessages.LARGE_TRANSACTION_WARN(newUncommittedSize, 
_maxUncommittedInMemorySize));
+                    _reported = true;
                 }
             }
         }
-        else
-        {
-            _uncommittedMessageSize.addAndGet(messageSize);
-            TransactionDetails details = 
_uncommittedMessages.computeIfAbsent(transaction, key -> new 
TransactionDetails());
-            details.messageEnqueued(handle);
-        }
     }
 
     @Override
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 6226617..3e43819 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2010,7 +2010,10 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
                                     }
                                     else
                                     {
-                                        storedMessage.flowToDisk();
+                                        if (node.getQueue().checkValid(node))
+                                        {
+                                            storedMessage.flowToDisk();
+                                        }
                                     }
                                 }
                             }
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 727f4c0..f26c765 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -1307,6 +1307,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn(id);
         when(message.getMessageHeader()).thenReturn(header);
+        when(message.checkValid()).thenReturn(true);
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 992b886..1031108 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -122,9 +122,12 @@ public class FlowToDiskOverflowPolicyHandlerTest extends 
QpidTestCase
     {
         ServerMessage message = mock(ServerMessage.class);
         when(message.getSizeIncludingHeader()).thenReturn(size);
+        when(message.checkValid()).thenReturn(true);
+        
when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.VALID);
 
         StoredMessage storedMessage = mock(StoredMessage.class);
         when(message.getStoredMessage()).thenReturn(storedMessage);
+        when(storedMessage.isInMemory()).thenReturn(true);
 
         MessageReference ref = mock(MessageReference.class);
         when(ref.getMessage()).thenReturn(message);
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 9912094..5a11306 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -348,6 +348,7 @@ public abstract class QueueEntryImplTestBase extends 
QpidTestCase
 
         final Action<? super MessageInstance> action = mock(Action.class);
         
when(_queueEntry.getMessage().isResourceAcceptable(dlq)).thenReturn(true);
+        when(_queueEntry.getMessage().checkValid()).thenReturn(true);
         _queueEntry.acquire();
         int enqueues = _queueEntry.routeToAlternate(action, null);
 
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
index 79f19d1..a7dee19 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -210,12 +210,13 @@ public class QueueMessageRecoveryTest extends QpidTestCase
         }
 
         @Override
-        protected void doEnqueue(final ServerMessage message, final Action<? 
super MessageInstance> action, MessageEnqueueRecord record)
+        protected QueueEntry doEnqueue(final ServerMessage message, final 
Action<? super MessageInstance> action, MessageEnqueueRecord record)
         {
             synchronized(_messageList)
             {
                 _messageList.add(message);
             }
+            return null;
         }
     }
 }
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
 
b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index d2daddb..04a8d9e 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -67,6 +67,7 @@ public class SimpleQueueEntryImplTest extends 
QueueEntryImplTestBase
     {
         ServerMessage message = mock(ServerMessage.class);
         when(message.getMessageNumber()).thenReturn((long)msgId);
+        when(message.checkValid()).thenReturn(true);
         final MessageReference reference = mock(MessageReference.class);
         when(reference.getMessage()).thenReturn(message);
         when(message.newReference()).thenReturn(reference);
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
 
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 231dd35..1f7d3fd 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -130,6 +130,18 @@ public class TestMessageMetaDataType implements 
MessageMetaDataType<TestMessageM
         }
 
         @Override
+        public boolean checkValid()
+        {
+            return true;
+        }
+
+        @Override
+        public ValidationStatus getValidationStatus()
+        {
+            return ValidationStatus.VALID;
+        }
+
+        @Override
         public long getExpiration()
         {
             return 0;
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java 
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index 3ccc94b..57e4bf1 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -149,6 +149,18 @@ class MockServerMessage implements ServerMessage
     }
 
     @Override
+    public boolean checkValid()
+    {
+        return true;
+    }
+
+    @Override
+    public ValidationStatus getValidationStatus()
+    {
+        return ValidationStatus.VALID;
+    }
+
+    @Override
     public long getArrivalTime()
     {
         throw new UnsupportedOperationException();
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
index 055f935..5fd653e 100755
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10Impl.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.nio.BufferUnderflowException;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
@@ -145,7 +146,7 @@ public class AMQPConnection_0_10Impl extends 
AbstractAMQPConnection<AMQPConnecti
                 _inputHandler.received(buf);
                 _connection.receivedComplete();
             }
-            catch (IllegalArgumentException | IllegalStateException e)
+            catch (IllegalArgumentException | IllegalStateException | 
BufferUnderflowException e)
             {
                 throw new ConnectionScopedRuntimeException(e);
             }
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c58f897..74c015f 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
@@ -203,6 +204,10 @@ public class ConsumerTarget_0_10 extends 
AbstractConsumerTarget<ConsumerTarget_0
         }
         else
         {
+            if (!serverMsg.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot 
convert malformed message '%s'", serverMsg));
+            }
             converter = (MessageConverter<? super ServerMessage, 
MessageTransferMessage>) 
MessageConverterRegistry.getConverter(serverMsg.getClass(), 
MessageTransferMessage.class);
             msg = converter.convert(serverMsg, _session.getAddressSpace());
         }
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index faab357..c56c8f2 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -123,4 +123,10 @@ public class AMQMessage extends 
AbstractServerMessageImpl<AMQMessage, MessageMet
     {
         return AMQP_0_9_1;
     }
+
+    @Override
+    protected void validate()
+    {
+        getMessageMetaData().validate();
+    }
 }
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index eca6d8a..bda9d1e 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.nio.BufferUnderflowException;
 import java.security.AccessControlException;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
@@ -257,7 +258,8 @@ public class AMQPConnection_0_8Impl
                     _decoder.decodeBuffer(msg);
                     receivedCompleteAllChannels();
                 }
-                catch (AMQFrameDecodingException | IOException e)
+                catch (AMQFrameDecodingException | IOException | 
AMQPInvalidClassException
+                        | IllegalArgumentException | IllegalStateException | 
BufferUnderflowException e)
                 {
                     LOGGER.error("Unexpected exception", e);
                     throw new ConnectionScopedRuntimeException(e);
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index b4f06da..092de1c 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.StateChangeListener;
@@ -402,6 +403,10 @@ public abstract class ConsumerTarget_0_8 extends 
AbstractConsumerTarget<Consumer
         }
         else
         {
+            if (!serverMessage.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot 
convert malformed message '%s'", serverMessage));
+            }
             messageConverter = 
MessageConverterRegistry.getConverter((Class<ServerMessage<?>>) 
serverMessage.getClass(), AMQMessage.class);
             msg = messageConverter.convert(serverMessage, 
getConnection().getAddressSpace());
         }
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 1defedb..3b1acb7 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.nio.BufferUnderflowException;
 import java.util.Collection;
 import java.util.Set;
 
@@ -152,6 +153,11 @@ public class MessageMetaData implements 
StorableMessageMetaData
         _contentHeaderBody.reallocate();
     }
 
+    public synchronized void validate()
+    {
+        _contentHeaderBody.getProperties().validate();
+    }
+
     private static class MetaDataFactory implements 
MessageMetaDataType.Factory<MessageMetaData>
     {
 
@@ -176,7 +182,8 @@ public class MessageMetaData implements 
StorableMessageMetaData
 
                 return new MessageMetaData(publishBody, chb, arrivalTime);
             }
-            catch (AMQFrameDecodingException | AMQProtocolVersionException e)
+            catch (AMQFrameDecodingException | AMQProtocolVersionException | 
AMQPInvalidClassException
+                    | IllegalArgumentException | IllegalStateException | 
BufferUnderflowException  e)
             {
                 throw new ConnectionScopedRuntimeException(e);
             }
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
index 43c26f1..e9c6268 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/transport/BasicContentHeaderProperties.java
@@ -855,4 +855,10 @@ public class BasicContentHeaderProperties
             _headers.reallocate();
         }
     }
+
+    public synchronized void validate()
+    {
+        _headers.validate();
+    }
+
 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 4798815..0d19b9d 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -134,6 +135,10 @@ class ConsumerTarget_1_0 extends 
AbstractConsumerTarget<ConsumerTarget_1_0>
         }
         else
         {
+            if (!serverMessage.checkValid())
+            {
+                throw new MessageConversionException(String.format("Cannot 
convert malformed message '%s'", serverMessage));
+            }
             converter =
                     (MessageConverter<? super ServerMessage, Message_1_0>) 
MessageConverterRegistry.getConverter(serverMessage.getClass(), 
Message_1_0.class);
             if (converter == null)
diff --git 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index aeff70e..7ad408b 100644
--- 
a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ 
b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -225,7 +225,7 @@ public abstract class AbstractJDBCMessageStore implements 
MessageStore
     {
         for (StoredJDBCMessage<?> message : _messages)
         {
-            message.clear();
+            message.clear(true);
         }
         _messages.clear();
         _inMemorySize.set(0);
@@ -1291,27 +1291,43 @@ public abstract class AbstractJDBCMessageStore 
implements MessageStore
 
         public void reallocate()
         {
-            if(_metaData != null)
+            if (_metaData != null)
             {
                 _metaData.reallocate();
             }
             _data = QpidByteBuffer.reallocateIfNecessary(_data);
         }
 
-        public long clear()
+        public long clear(boolean close)
         {
             long bytesCleared = 0;
-            if(_metaData != null)
+            if(_data != null)
             {
-                bytesCleared += _metaData.getStorableSize();
-                _metaData.clearEncodedForm();
-                _metaData = null;
+                if(_data != null)
+                {
+                    bytesCleared += _data.remaining();
+                    _data.dispose();
+                    _data = null;
+                }
             }
-            if(_data != null)
+            if (_metaData != null)
             {
-                bytesCleared += _data.remaining();
-                _data.dispose();
-                _data = null;
+                bytesCleared += _metaData.getStorableSize();
+                try
+                {
+                    if (close)
+                    {
+                        _metaData.dispose();
+                    }
+                    else
+                    {
+                        _metaData.clearEncodedForm();
+                    }
+                }
+                finally
+                {
+                    _metaData = null;
+                }
             }
             return bytesCleared;
         }
@@ -1551,7 +1567,7 @@ public abstract class AbstractJDBCMessageStore implements 
MessageStore
             flushToStore();
             if(_messageDataRef != null && !_messageDataRef.isHardRef())
             {
-                final long bytesCleared = _messageDataRef.clear();
+                final long bytesCleared = _messageDataRef.clear(false);
                 _inMemorySize.addAndGet(-bytesCleared);
                 _bytesEvacuatedFromMemory.addAndGet(bytesCleared);
             }
@@ -1567,11 +1583,11 @@ public abstract class AbstractJDBCMessageStore 
implements MessageStore
             }
         }
 
-        public synchronized void clear()
+        public synchronized void clear(boolean close)
         {
             if (_messageDataRef != null)
             {
-                _messageDataRef.clear();
+                _messageDataRef.clear(close);
             }
         }
 
diff --git 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 045c66f..8c7fc24 100644
--- 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -84,6 +84,7 @@ import org.apache.qpid.server.model.OperationParameter;
 import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.session.AMQPSession;
@@ -391,6 +392,10 @@ class ManagementNode implements MessageSource, 
MessageDestination, BaseQueue
                         final Action<? super MessageInstance> action,
                         final MessageEnqueueRecord record)
     {
+        if (!message.checkValid())
+        {
+            throw new MessageConversionException(String.format("Cannot convert 
malformed message '%s'", message));
+        }
         @SuppressWarnings("unchecked")
         MessageConverter<ServerMessage, InternalMessage> converter =
                 (MessageConverter<ServerMessage, InternalMessage>) 
MessageConverterRegistry.getConverter((message.getClass()), 
InternalMessage.class);
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
new file mode 100644
index 0000000..296edf6
--- /dev/null
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/qpid/message/MalformedMessage.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.qpid.message;
+
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+@ConfigItem(name = "broker.flowToDiskThreshold", value = "1")
+@ConfigItem(name = "connection.maxUncommittedInMemorySize", value = "1")
+public class MalformedMessage extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private static final String CONTENT_TEXT = "Test";
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    public void malformedMessage() throws Exception
+    {
+        try (final FrameTransport transport = new 
FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attach()
+                       .consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            final Flow flow = interaction.getLatestResponse(Flow.class);
+            assertThat(flow.getLinkCredit().intValue(), 
Matchers.is(greaterThan(1)));
+
+            final QpidByteBuffer payload = generateMalformed();
+            interaction.transferSettled(true)
+                       .transferPayload(payload)
+                       .transferSettled(true)
+                       .transfer();
+
+            final Detach responseDetach = 
interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), 
is(equalTo(AmqpError.DECODE_ERROR)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    private QpidByteBuffer generateMalformed()
+    {
+        final List<QpidByteBuffer> payload = new ArrayList<>();
+
+        final Properties properties = new Properties();
+        properties.setTo(BrokerAdmin.TEST_QUEUE_NAME);
+        PropertiesSection propertiesSection = 
properties.createEncodingRetainingSection();
+        final QpidByteBuffer props = propertiesSection.getEncodedForm();
+        payload.add(props);
+        propertiesSection.dispose();
+
+        final AmqpValue amqpValue = new AmqpValue(CONTENT_TEXT);
+        final AmqpValueSection dataSection = 
amqpValue.createEncodingRetainingSection();
+
+        final QpidByteBuffer encodedData = dataSection.getEncodedForm();
+        payload.add(encodedData.view(0, encodedData.remaining() - 1));
+        encodedData.dispose();
+        dataSection.dispose();
+
+        final QpidByteBuffer combined = QpidByteBuffer.concatenate(payload);
+        payload.forEach(QpidByteBuffer::dispose);
+        return combined;
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to