This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new b8e24e5b71 QPID-8013: [Broker-J] Reduce footprint of AMQP 1.0 protocol 
objects (#369)
b8e24e5b71 is described below

commit b8e24e5b71049e6646231a35611e2b0413d975b8
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Wed Jan 28 12:24:29 2026 +0100

    QPID-8013: [Broker-J] Reduce footprint of AMQP 1.0 protocol objects (#369)
---
 .../v1_0/CompositeTypeConstructorGenerator.java    |  10 +-
 .../protocol/v1_0/AMQPConnection_1_0Impl.java      |  73 +++--
 .../protocol/v1_0/AnonymousRelayDestination.java   |  11 +-
 .../server/protocol/v1_0/ConsumerTarget_1_0.java   | 110 +++++---
 .../apache/qpid/server/protocol/v1_0/Delivery.java |  23 +-
 .../protocol/v1_0/ExchangeSendingDestination.java  |   2 +-
 .../v1_0/MessageConverter_Internal_to_v1_0.java    |  26 +-
 .../protocol/v1_0/MessageConverter_from_1_0.java   |  13 +-
 .../protocol/v1_0/MessageConverter_to_1_0.java     |  30 ++-
 .../server/protocol/v1_0/MessageMetaData_1_0.java  |   5 +-
 .../qpid/server/protocol/v1_0/Message_1_0.java     |   6 +-
 .../protocol/v1_0/NodeReceivingDestination.java    |  27 +-
 .../protocol/v1_0/ProtocolEngineCreator_1_0_0.java |  20 +-
 .../v1_0/ProtocolEngineCreator_1_0_0_SASL.java     |  24 +-
 .../server/protocol/v1_0/SendingLinkEndpoint.java  |   2 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java     |  21 +-
 .../v1_0/StandardReceivingLinkEndpoint.java        |   2 +-
 .../protocol/v1_0/StandardSendingDestination.java  |   2 +-
 .../v1_0/TxnCoordinatorReceivingLinkEndpoint.java  |   2 +-
 .../server/protocol/v1_0/codec/FrameWriter.java    |  14 +-
 .../server/protocol/v1_0/codec/ValueHandler.java   |   1 +
 .../qpid/server/protocol/v1_0/constants/Bytes.java |  62 +++++
 .../server/protocol/v1_0/framing/AMQFrame.java     |  46 +++-
 .../protocol/v1_0/framing/TransportFrame.java      |  44 +++
 .../v1_0/messaging/SectionDecoderImpl.java         |   1 +
 .../qpid/server/protocol/v1_0/type/Binary.java     |  14 +
 .../v1_0/type/codec/AMQPDescribedTypeRegistry.java |   3 +-
 .../protocol/v1_0/type/messaging/Accepted.java     |   1 +
 .../protocol/v1_0/type/messaging/AmqpValue.java    |   2 +-
 .../protocol/v1_0/type/transport/Transfer.java     |   6 +
 .../protocol/v1_0/ConsumerTarget_1_0Test.java      | 267 ++++++++++++++++++-
 .../qpid/server/protocol/v1_0/DeliveryTest.java    | 118 +++++++++
 .../v1_0/ProtocolEngineCreator_1_0_0Test.java      |  65 +++++
 .../v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java |  63 +++++
 .../protocol/v1_0/ProtocolEngine_1_0_0Test.java    |  20 ++
 .../qpid/server/protocol/v1_0/Session_1_0Test.java | 294 +++++++++++++++++++++
 36 files changed, 1246 insertions(+), 184 deletions(-)

diff --git 
a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
index 2d76f5a0e9..14b2775afa 100644
--- 
a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
+++ 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
@@ -77,6 +77,7 @@ public class CompositeTypeConstructorGenerator  extends 
AbstractProcessor
             
"org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy",
             
"org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability");
 
+    private static final List<String> SINGLETONS = List.of("Accepted");
 
     @Override
     public SourceVersion getSupportedSourceVersion()
@@ -188,7 +189,14 @@ public class CompositeTypeConstructorGenerator  extends 
AbstractProcessor
             pw.println("    @Override");
             pw.println("    protected " + objectSimpleName + " construct(final 
FieldValueReader fieldValueReader) throws AmqpErrorException");
             pw.println("    {");
-            pw.println("        " + objectSimpleName + " obj = new " + 
objectSimpleName + "();");
+            if (SINGLETONS.contains(objectSimpleName))
+            {
+                pw.println("        " + objectSimpleName + " obj = " + 
objectSimpleName + ".INSTANCE;");
+            }
+            else
+            {
+                pw.println("        " + objectSimpleName + " obj = new " + 
objectSimpleName + "();");
+            }
             pw.println();
             generateAssigners(pw, typeElement);
             pw.println("        return obj;");
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 3e2c0249fc..13736ef257 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -51,14 +51,13 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.messages.ResourceLimitMessages;
-import org.apache.qpid.server.security.limit.ConnectionLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.messages.ResourceLimitMessages;
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Connection;
@@ -74,6 +73,7 @@ import 
org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
 import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
 import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
@@ -117,6 +117,7 @@ import 
org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import 
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import 
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
 import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
+import org.apache.qpid.server.security.limit.ConnectionLimitException;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.AggregateTicker;
@@ -146,31 +147,6 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new 
AtomicReference<>();
 
-
-    private static final byte[] SASL_HEADER = new byte[]
-            {
-                    (byte) 'A',
-                    (byte) 'M',
-                    (byte) 'Q',
-                    (byte) 'P',
-                    (byte) 3,
-                    (byte) 1,
-                    (byte) 0,
-                    (byte) 0
-            };
-
-    private static final byte[] AMQP_HEADER = new byte[]
-            {
-                    (byte) 'A',
-                    (byte) 'M',
-                    (byte) 'Q',
-                    (byte) 'P',
-                    (byte) 0,
-                    (byte) 1,
-                    (byte) 0,
-                    (byte) 0
-            };
-
     private final FrameWriter _frameWriter;
     private ProtocolHandler _frameHandler;
     private volatile boolean _transportBlockedForWriting;
@@ -286,7 +262,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         }
         String mechanism = saslInit.getMechanism().toString();
         final Binary initialResponse = saslInit.getInitialResponse();
-        byte[] response = initialResponse == null ? new byte[0] : 
initialResponse.getArray();
+        byte[] response = initialResponse == null ? Bytes.EMPTY_BYTE_ARRAY : 
initialResponse.getArray();
 
         List<String> availableMechanisms =
                 
_subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure());
@@ -306,7 +282,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     {
         assertState(ConnectionState.AWAIT_SASL_RESPONSE);
         final Binary responseBinary = saslResponse.getResponse();
-        byte[] response = responseBinary == null ? new byte[0] : 
responseBinary.getArray();
+        byte[] response = responseBinary == null ? Bytes.EMPTY_BYTE_ARRAY : 
responseBinary.getArray();
 
         processSaslResponse(response);
     }
@@ -338,7 +314,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         SubjectAuthenticationResult authenticationResult = 
_successfulAuthenticationResult;
         if (authenticationResult == null)
         {
-            authenticationResult = 
_subjectCreator.authenticate(_saslNegotiator, response != null ? response : new 
byte[0]);
+            authenticationResult = 
_subjectCreator.authenticate(_saslNegotiator, response != null ? response : 
Bytes.EMPTY_BYTE_ARRAY);
             challenge = authenticationResult.getChallenge();
         }
 
@@ -1204,21 +1180,30 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     {
         if (!_closedForOutput)
         {
-            ValueWriter<FrameBody> writer = 
_describedTypeRegistry.getValueWriter(body);
-            if (payload == null)
+            final int payloadRemaining = payload == null ? 0 : 
payload.remaining();
+            final boolean hasPayload = payloadRemaining > 0;
+
+            if (hasPayload && !(body instanceof Transfer))
+            {
+                throw new ConnectionScopedRuntimeException("Non-empty payload 
is only supported for Transfer frames. " +
+                        "body=" + (body == null ? "null" : 
body.getClass().getName()) +
+                        ", payloadRemaining=" + payloadRemaining);
+            }
+
+            ValueWriter<FrameBody> writer = body == null ? null : 
_describedTypeRegistry.getValueWriter(body);
+            if (!hasPayload)
             {
-                send(new TransportFrame(channel, body));
+                send(new TransportFrame(channel, body, writer));
                 return 0;
             }
             else
             {
                 int size = writer.getEncodedSize();
                 int maxPayloadSize = _maxFrameSize - (size + 9);
-                long payloadLength = (long) payload.remaining();
-                if (payloadLength <= maxPayloadSize)
+                if (payloadRemaining <= maxPayloadSize)
                 {
-                    send(new TransportFrame(channel, body, payload));
-                    return (int)payloadLength;
+                    send(new TransportFrame(channel, body, payload, writer));
+                    return payloadRemaining;
                 }
                 else
                 {
@@ -1231,7 +1216,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                     try (QpidByteBuffer payloadDup = payload.view(0, 
maxPayloadSize))
                     {
                         payload.position(payload.position() + maxPayloadSize);
-                        send(new TransportFrame(channel, body, payloadDup));
+                        send(new TransportFrame(channel, body, payloadDup, 
writer));
                     }
 
                     return maxPayloadSize;
@@ -1361,14 +1346,16 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
 
             final AuthenticationProvider<?> authenticationProvider = 
getPort().getAuthenticationProvider();
 
-            if(Arrays.equals(header, SASL_HEADER))
+            final byte[] amqpHeader = Bytes.amqpHeader();
+            final byte[] saslHeader = Bytes.saslHeader();
+            if (Arrays.equals(header, saslHeader))
             {
                 if(_saslComplete)
                 {
                     throw new ConnectionScopedRuntimeException("SASL Layer 
header received after SASL already established");
                 }
 
-                try (QpidByteBuffer protocolHeader = 
QpidByteBuffer.wrap(SASL_HEADER))
+                try (QpidByteBuffer protocolHeader = 
QpidByteBuffer.wrap(saslHeader))
                 {
                     getSender().send(protocolHeader);
                 }
@@ -1384,7 +1371,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                 _connectionState = ConnectionState.AWAIT_SASL_INIT;
                 _frameHandler = getFrameHandler(true);
             }
-            else if(Arrays.equals(header, AMQP_HEADER))
+            else if(Arrays.equals(header, amqpHeader))
             {
                 if(!_saslComplete)
                 {
@@ -1406,7 +1393,7 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
                     }
 
                 }
-                try (QpidByteBuffer protocolHeader = 
QpidByteBuffer.wrap(AMQP_HEADER))
+                try (QpidByteBuffer protocolHeader = 
QpidByteBuffer.wrap(amqpHeader))
                 {
                     getSender().send(protocolHeader);
                 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index fb50ef0e0e..ab55ce4f61 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -37,6 +37,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
 
 public class AnonymousRelayDestination implements ReceivingDestination
 {
+    private static final Symbol[] CAPABILITIES = { DELAYED_DELIVERY };
+
     private final Target _target;
     private final NamedAddressSpace _addressSpace;
     private final EventLogger _eventLogger;
@@ -53,10 +55,17 @@ public class AnonymousRelayDestination implements 
ReceivingDestination
                                                                        
.contains(DISCARD_UNROUTABLE);
     }
 
+    /**
+     * Returns the target capabilities for an anonymous relay.
+     * <br>
+     * Note: returns a shared array instance to avoid per-call allocation. The 
returned array must be treated as
+     * immutable and must not be modified by callers.
+     * @return {@link Symbol} array
+     */
     @Override
     public Symbol[] getCapabilities()
     {
-        return new Symbol[]{DELAYED_DELIVERY};
+        return CAPABILITIES;
     }
 
     @Override
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 319ca7ada8..57ef39b60f 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -68,6 +67,8 @@ import org.apache.qpid.server.util.StateChangeListener;
 class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerTarget_1_0.class);
+    private static final UnsettledAction DO_NOTHING_ACTION = new 
DoNothingAction();
+
     private final boolean _acquires;
 
     private long _deliveryTag = 0L;
@@ -189,55 +190,82 @@ class ConsumerTarget_1_0 extends 
AbstractConsumerTarget<ConsumerTarget_1_0>
 
                 headerSection = header.createEncodingRetainingSection();
             }
-            List<QpidByteBuffer> payload = new ArrayList<>();
-            if(headerSection != null)
+
+            final EncodingRetainingSection<?> deliveryAnnotationsSection = 
message.getDeliveryAnnotationsSection();
+            final EncodingRetainingSection<?> messageAnnotationsSection = 
message.getMessageAnnotationsSection();
+            final EncodingRetainingSection<?> propertiesSection = 
message.getPropertiesSection();
+            final EncodingRetainingSection<?> applicationPropertiesSection = 
message.getApplicationPropertiesSection();
+            final EncodingRetainingSection<?> footerSection = 
message.getFooterSection();
+
+            final boolean bodyOnly = headerSection == null &&
+                    deliveryAnnotationsSection == null &&
+                    messageAnnotationsSection == null &&
+                    propertiesSection == null &&
+                    applicationPropertiesSection == null &&
+                    footerSection == null;
+
+            if (bodyOnly)
             {
-                payload.add(headerSection.getEncodedForm());
-                headerSection.dispose();
+                if (bodyContent != null)
+                {
+                    transfer.setPayload(bodyContent);
+                    bodyContent.dispose();
+                }
             }
-            EncodingRetainingSection<?> section;
-            if((section = message.getDeliveryAnnotationsSection()) != null)
+            else
             {
-                payload.add(section.getEncodedForm());
-                section.dispose();
-            }
+                final List<QpidByteBuffer> payload = new ArrayList<>();
 
-            if((section = message.getMessageAnnotationsSection()) != null)
-            {
-                payload.add(section.getEncodedForm());
-                section.dispose();
-            }
+                if (headerSection != null)
+                {
+                    payload.add(headerSection.getEncodedForm());
+                    headerSection.dispose();
+                }
 
-            if((section = message.getPropertiesSection()) != null)
-            {
-                payload.add(section.getEncodedForm());
-                section.dispose();
-            }
+                if (deliveryAnnotationsSection != null)
+                {
+                    payload.add(deliveryAnnotationsSection.getEncodedForm());
+                    deliveryAnnotationsSection.dispose();
+                }
 
-            if((section = message.getApplicationPropertiesSection()) != null)
-            {
-                payload.add(section.getEncodedForm());
-                section.dispose();
-            }
+                if (messageAnnotationsSection != null)
+                {
+                    payload.add(messageAnnotationsSection.getEncodedForm());
+                    messageAnnotationsSection.dispose();
+                }
 
-            payload.add(bodyContent);
+                if (propertiesSection != null)
+                {
+                    payload.add(propertiesSection.getEncodedForm());
+                    propertiesSection.dispose();
+                }
 
-            if((section = message.getFooterSection()) != null)
-            {
-                payload.add(section.getEncodedForm());
-                section.dispose();
-            }
+                if (applicationPropertiesSection != null)
+                {
+                    payload.add(applicationPropertiesSection.getEncodedForm());
+                    applicationPropertiesSection.dispose();
+                }
 
-            try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload))
-            {
-                transfer.setPayload(combined);
-            }
+                if (bodyContent != null)
+                {
+                    payload.add(bodyContent);
+                }
+
+                if (footerSection != null)
+                {
+                    payload.add(footerSection.getEncodedForm());
+                    footerSection.dispose();
+                }
 
-            payload.forEach(QpidByteBuffer::dispose);
+                try (QpidByteBuffer combined = 
QpidByteBuffer.concatenate(payload))
+                {
+                    transfer.setPayload(combined);
+                }
+
+                payload.forEach(QpidByteBuffer::dispose);
+            }
 
-            byte[] data = new byte[8];
-            ByteBuffer.wrap(data).putLong(_deliveryTag++);
-            final Binary tag = new Binary(data);
+            final Binary tag = Binary.ofDeliveryTag(_deliveryTag++);
 
             transfer.setDeliveryTag(tag);
 
@@ -249,7 +277,7 @@ class ConsumerTarget_1_0 extends 
AbstractConsumerTarget<ConsumerTarget_1_0>
                     transfer.setSettled(true);
                     if (_acquires && _transactionId == null)
                     {
-                        transfer.setState(new Accepted());
+                        transfer.setState(Accepted.INSTANCE);
                     }
                 }
                 else
@@ -262,7 +290,7 @@ class ConsumerTarget_1_0 extends 
AbstractConsumerTarget<ConsumerTarget_1_0>
                     }
                     else
                     {
-                        action = new DoNothingAction();
+                        action = DO_NOTHING_ACTION;
                     }
 
                     _linkEndpoint.addUnsettled(tag, action, entry);
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index 9bd89cbb30..236ff40ebc 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -170,13 +170,8 @@ public class Delivery
             }
         }
 
-        try (QpidByteBuffer payload = transfer.getPayload())
-        {
-            if (payload != null)
-            {
-                _totalPayloadSize += (long) payload.remaining();
-            }
-        }
+        final int remaining = transfer.getPayloadRemaining();
+        _totalPayloadSize += (long) remaining;
     }
 
     public LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
getLinkEndpoint()
@@ -186,6 +181,20 @@ public class Delivery
 
     public QpidByteBuffer getPayload()
     {
+        final int size = _transfers.size();
+        if (size == 0)
+        {
+            return QpidByteBuffer.emptyQpidByteBuffer();
+        }
+        else if (size == 1)
+        {
+            final Transfer transfer = _transfers.get(0);
+            final QpidByteBuffer payload = transfer.getPayload();
+            transfer.dispose();
+            _transfers.clear();
+            return payload == null ? QpidByteBuffer.emptyQpidByteBuffer() : 
payload;
+        }
+
         List<QpidByteBuffer> transferBuffers = new 
ArrayList<>(_transfers.size());
         for (Transfer t : _transfers)
         {
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
index 8f84530c6f..cb9eb3942c 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
@@ -58,7 +58,7 @@ import 
org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 
 public class ExchangeSendingDestination extends StandardSendingDestination
 {
-    private static final Accepted ACCEPTED = new Accepted();
+    private static final Accepted ACCEPTED = Accepted.INSTANCE;
     private static final Rejected REJECTED = new Rejected();
     private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
     public static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
index 4fd015c2e8..f7af6edb39 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
@@ -62,7 +62,6 @@ import 
org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 @PluggableService
 public class MessageConverter_Internal_to_v1_0 extends 
MessageConverter_to_1_0<InternalMessage>
 {
-
     private static final Set<Class<?>> TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE = 
Set.of(String.class,
             Character.class,
             Boolean.class,
@@ -163,8 +162,7 @@ public class MessageConverter_Internal_to_v1_0 extends 
MessageConverter_to_1_0<I
         {
             contentTypeAnnotationValue = 
isSectionValidForJmsMap(convertedMessageBody) ? MAP_MESSAGE.getType() : null;
         }
-        else if (originalMessageBody != null
-                 && 
TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz -> 
clazz.isAssignableFrom(originalMessageBody.getClass())))
+        else if (originalMessageBody != null && 
expressibleAsAmqpValue(originalMessageBody))
         {
             contentTypeAnnotationValue = null;
         }
@@ -211,8 +209,7 @@ public class MessageConverter_Internal_to_v1_0 extends 
MessageConverter_to_1_0<I
         {
             contentTypeAsString = null;
         }
-        else if (messageBody != null
-                 && 
TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz -> 
clazz.isAssignableFrom(messageBody.getClass())))
+        else if (messageBody != null && expressibleAsAmqpValue(messageBody))
         {
             contentTypeAsString = mimeType;
         }
@@ -280,8 +277,7 @@ public class MessageConverter_Internal_to_v1_0 extends 
MessageConverter_to_1_0<I
 
     public NonEncodingRetainingSection<?> convertToBody(Object object)
     {
-        if (object == null
-            || TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz -> 
clazz.isAssignableFrom(object.getClass())))
+        if (object == null || expressibleAsAmqpValue(object))
         {
             return new AmqpValue(object);
         }
@@ -312,4 +308,20 @@ public class MessageConverter_Internal_to_v1_0 extends 
MessageConverter_to_1_0<I
         }
     }
 
+    private static boolean expressibleAsAmqpValue(final Object object)
+    {
+        if (object == null)
+        {
+            return true;
+        }
+        final Class<?> type = object.getClass();
+        for (final Class<?> amqpType : TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE)
+        {
+            if (amqpType.isAssignableFrom(type))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
index 93ab0c32d4..14f9b78d69 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0;
 import static 
org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter;
 
 import java.io.Serializable;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -63,6 +62,8 @@ import 
org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class MessageConverter_from_1_0
 {
+    private static final SectionDecoderImpl SECTION_DECODER =
+            new 
SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
 
     private static final Set<Class> STANDARD_TYPES = new 
HashSet<>(Arrays.<Class>asList(Boolean.class,
                                                                                
         Byte.class,
@@ -79,15 +80,13 @@ public class MessageConverter_from_1_0
 
     static Object convertBodyToObject(final Message_1_0 serverMessage)
     {
-        final SectionDecoderImpl sectionDecoder = new 
SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
-
         Object bodyObject = null;
         List<EncodingRetainingSection<?>> sections = null;
         try
         {
             try (QpidByteBuffer allData = serverMessage.getContent())
             {
-                sections = sectionDecoder.parseAll(allData);
+                sections = SECTION_DECODER.parseAll(allData);
             }
 
             final int size = sections == null ? 0 : sections.size();
@@ -127,10 +126,12 @@ public class MessageConverter_from_1_0
                         totalSize += 
((DataSection)section).getValue().getArray().length;
                     }
                     final byte[] bodyData = new byte[totalSize];
-                    final ByteBuffer buf = ByteBuffer.wrap(bodyData);
+                    int pos = 0;
                     for(EncodingRetainingSection<?> section : bodySections)
                     {
-                        buf.put(((DataSection) 
section).getValue().asByteBuffer());
+                        final byte[] src = ((DataSection) 
section).getValue().getArray();
+                        System.arraycopy(src, 0, bodyData, pos, src.length);
+                        pos += src.length;
                     }
                     bodyObject = bodyData;
                 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index a9914c4a31..518611b0d0 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -68,12 +68,24 @@ import org.apache.qpid.server.util.GZIPUtils;
 
 public abstract class MessageConverter_to_1_0<M extends ServerMessage> 
implements MessageConverter<M, Message_1_0>
 {
+    /** Must be treated as immutable and must not be modified */
     private static final byte[] SERIALIZED_NULL = getObjectBytes(null);
+    /** Must be treated as immutable and must not be modified */
+    private static final NonEncodingRetainingSection<?> AMQP_VALUE_NULL = new 
AmqpValue(null);
+    /** Must be treated as immutable and must not be modified */
+    private static final NonEncodingRetainingSection<?> 
AMQP_VALUE_EMPTY_STRING = new AmqpValue("");
+    /** Must be treated as immutable and must not be modified */
+    private static final NonEncodingRetainingSection<?> AMQP_VALUE_EMPTY_LIST 
= new AmqpSequence(Collections.emptyList());
+    /** Must be treated as immutable and must not be modified */
+    private static final NonEncodingRetainingSection<?> AMQP_VALUE_EMPTY_MAP = 
new AmqpValue(Collections.emptyMap());
+    /** Must be treated as immutable and must not be modified */
+    private static final NonEncodingRetainingSection<?> DATA_SERIALIZED_NULL = 
new Data(new Binary(SERIALIZED_NULL.clone()));
+
     private final AMQPDescribedTypeRegistry _typeRegistry = 
AMQPDescribedTypeRegistry.newInstance()
-                                                                               
      .registerTransportLayer()
-                                                                               
      .registerMessagingLayer()
-                                                                               
      .registerTransactionLayer()
-                                                                               
      .registerSecurityLayer();
+            .registerTransportLayer()
+            .registerMessagingLayer()
+            .registerTransactionLayer()
+            .registerSecurityLayer();
 
     public static Symbol getContentType(final String contentMimeType)
     {
@@ -280,23 +292,23 @@ public abstract class MessageConverter_to_1_0<M extends 
ServerMessage> implement
         }
         else if (mimeType == null)
         {
-            return new AmqpValue(null);
+            return AMQP_VALUE_NULL;
         }
         else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return new Data(new Binary(SERIALIZED_NULL));
+            return DATA_SERIALIZED_NULL;
         }
         else if (TEXT_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return new AmqpValue("");
+            return AMQP_VALUE_EMPTY_STRING;
         }
         else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return new AmqpValue(Collections.emptyMap());
+            return AMQP_VALUE_EMPTY_MAP;
         }
         else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
         {
-            return new AmqpSequence(Collections.emptyList());
+            return AMQP_VALUE_EMPTY_LIST;
         }
         return new Data(new Binary(data));
     }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index d1ea39ae2f..da8f7eb40a 100755
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -404,6 +404,7 @@ public class MessageMetaData_1_0 implements 
StorableMessageMetaData
     private static class MetaDataFactory implements 
MessageMetaDataType.Factory<MessageMetaData_1_0>
     {
         private final AMQPDescribedTypeRegistry _typeRegistry = 
AMQPDescribedTypeRegistry.newInstance();
+        private final SectionDecoder _sectionDecoder = new 
SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
 
         private MetaDataFactory()
         {
@@ -449,9 +450,7 @@ public class MessageMetaData_1_0 implements 
StorableMessageMetaData
                             versionByte));
                 }
 
-                SectionDecoder sectionDecoder = new 
SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
-
-                List<EncodingRetainingSection<?>> sections = 
sectionDecoder.parseAll(buf);
+                List<EncodingRetainingSection<?>> sections = 
_sectionDecoder.parseAll(buf);
 
                 if (versionByte == 0)
                 {
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index f33dfe39e4..39f0792b86 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -55,6 +55,8 @@ public class Message_1_0 extends 
AbstractServerMessageImpl<Message_1_0, MessageM
                                                                                
                       .registerSecurityLayer();
     private static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new 
MessageMetaData_1_0(null, null, null, null, null, null, 0L, 0L);
     private static final String AMQP_1_0 = "AMQP 1.0";
+    private static final SectionDecoder SECTION_DECODER =
+            new 
SectionDecoderImpl(DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry());
 
     public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
     {
@@ -176,8 +178,6 @@ public class Message_1_0 extends 
AbstractServerMessageImpl<Message_1_0, MessageM
     {
         if(getMessageMetaData().getVersion() == 0)
         {
-            SectionDecoder sectionDecoder = new 
SectionDecoderImpl(DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry());
-
             try
             {
                 List<EncodingRetainingSection<?>> sections;
@@ -185,7 +185,7 @@ public class Message_1_0 extends 
AbstractServerMessageImpl<Message_1_0, MessageM
                 // not just #getSize()
                 try (QpidByteBuffer allSectionsContent = super.getContent(0, 
Integer.MAX_VALUE))
                 {
-                    sections = sectionDecoder.parseAll(allSectionsContent);
+                    sections = SECTION_DECODER.parseAll(allSectionsContent);
                 }
                 List<QpidByteBuffer> bodySectionContent = new ArrayList<>();
 
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 3727094b8d..dd81cfd290 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -45,6 +45,9 @@ import org.apache.qpid.server.txn.TransactionMonitor;
 
 public class NodeReceivingDestination implements ReceivingDestination
 {
+    private static final Symbol[] CAPABILITIES_DISCARD = { DISCARD_UNROUTABLE, 
DELAYED_DELIVERY };
+    private static final Symbol[] CAPABILITIES_REJECT = { REJECT_UNROUTABLE, 
DELAYED_DELIVERY };
+
     private final boolean _discardUnroutable;
     private final EventLogger _eventLogger;
 
@@ -145,11 +148,13 @@ public class NodeReceivingDestination implements 
ReceivingDestination
         }
         else
         {
-            result.getRoutes()
-                  .stream()
-                  .filter(q -> q instanceof TransactionMonitor)
-                  .map(TransactionMonitor.class::cast)
-                  .forEach(tm -> tm.registerTransaction(txn));
+            for (final Object route : result.getRoutes())
+            {
+                if (route instanceof TransactionMonitor transactionMonitor)
+                {
+                    transactionMonitor.registerTransaction(txn);
+                }
+            }
         }
     }
 
@@ -201,12 +206,16 @@ public class NodeReceivingDestination implements 
ReceivingDestination
         return _destination;
     }
 
+    /**
+     * Returns the supported outcome capabilities for this node.
+     * <br>
+     * Note: returns a shared array to avoid per-call allocations. The 
returned array must be treated as
+     * immutable and must not be modified by callers.
+     * @return {@link Symbol} array
+     */
     @Override
     public Symbol[] getCapabilities()
     {
-        Symbol[] capabilities = new Symbol[2];
-        capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : 
REJECT_UNROUTABLE;
-        capabilities[1] = DELAYED_DELIVERY;
-        return capabilities;
+        return _discardUnroutable ? CAPABILITIES_DISCARD : CAPABILITIES_REJECT;
     }
 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
index 618e8ff008..d9dff8cd38 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import 
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
 import 
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
@@ -44,17 +45,6 @@ public class ProtocolEngineCreator_1_0_0 implements 
ProtocolEngineCreator
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolEngineCreator_1_0_0.class);
 
-    private static final byte[] AMQP_1_0_0_HEADER =
-            new byte[] { (byte) 'A',
-                         (byte) 'M',
-                         (byte) 'Q',
-                         (byte) 'P',
-                         (byte) 0,
-                         (byte) 1,
-                         (byte) 0,
-                         (byte) 0
-            };
-
     public ProtocolEngineCreator_1_0_0()
     {
     }
@@ -65,11 +55,15 @@ public class ProtocolEngineCreator_1_0_0 implements 
ProtocolEngineCreator
         return Protocol.AMQP_1_0;
     }
 
-
+    /**
+     * Returns a AMQP header shared array instance to avoid per-call 
allocation.
+     * The returned array must be treated as immutable and must not be 
modified by callers.
+     * @return AMQP header byte array
+     */
     @Override
     public byte[] getHeaderIdentifier()
     {
-        return AMQP_1_0_0_HEADER;
+        return Bytes.amqpHeader();
     }
 
     @Override
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
index 4def7a9a4d..4117a33479 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
@@ -20,30 +20,20 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
 import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 
 @PluggableService
 public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator
 {
-    private static final byte[] AMQP_SASL_1_0_0_HEADER =
-            new byte[] { (byte) 'A',
-                         (byte) 'M',
-                         (byte) 'Q',
-                         (byte) 'P',
-                         (byte) 3,
-                         (byte) 1,
-                         (byte) 0,
-                         (byte) 0
-            };
-
     public ProtocolEngineCreator_1_0_0_SASL()
     {
     }
@@ -54,11 +44,15 @@ public class ProtocolEngineCreator_1_0_0_SASL implements 
ProtocolEngineCreator
         return Protocol.AMQP_1_0;
     }
 
-
+    /**
+     * Returns a AMQP header shared array instance to avoid per-call 
allocation.
+     * The returned array must be treated as immutable and must not be 
modified by callers.
+     * @return AMQP header byte array
+     */
     @Override
     public byte[] getHeaderIdentifier()
     {
-        return AMQP_SASL_1_0_0_HEADER;
+        return Bytes.saslHeader();
     }
 
     @Override
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index f5080a3bc6..71183e51d4 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -533,7 +533,7 @@ public class SendingLinkEndpoint extends 
AbstractLinkEndpoint<Source, Target>
 
         while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())
         {
-            Accepted accepted = new Accepted();
+            Accepted accepted = Accepted.INSTANCE;
             Transfer xfr = new Transfer();
             Binary dt = _resumeAcceptedTransfers.remove(0);
             xfr.setDeliveryTag(dt);
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1602b6d65b..a70684ca32 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -316,9 +316,12 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
     private SortedSet<UnsignedInteger> getDeliveryIds(final Set<Binary> 
deliveryTags, final LinkEndpoint<?, ?> linkEndpoint)
     {
         final DeliveryRegistry deliveryRegistry = 
getDeliveryRegistry(linkEndpoint.getRole());
-        return deliveryTags.stream()
-                           .map(deliveryTag -> getDeliveryId(deliveryRegistry, 
deliveryTag, linkEndpoint))
-                           .collect(Collectors.toCollection(TreeSet::new));
+        final SortedSet<UnsignedInteger> ids = new TreeSet<>();
+        for (final Binary tag : deliveryTags)
+        {
+            ids.add(getDeliveryId(deliveryRegistry, tag, linkEndpoint));
+        }
+        return ids;
     }
 
     private UnsignedInteger getDeliveryId(final Binary deliveryTag, final 
LinkEndpoint<?, ?> linkEndpoint)
@@ -390,24 +393,22 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
         {
             long remaining = payload == null ? 0 : (long) payload.remaining();
             int payloadSent = _connection.sendFrame(_sendingChannel, xfr, 
payload);
-            if(payload != null)
+            if (payload != null)
             {
                 while (payloadSent < remaining && payloadSent >= 0)
                 {
-                    Transfer continuationTransfer = new Transfer();
-
+                    final Transfer continuationTransfer = new Transfer();
                     continuationTransfer.setHandle(xfr.getHandle());
                     
continuationTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
                     continuationTransfer.setState(xfr.getState());
-                    continuationTransfer.setPayload(payload);
+                    // no need to pass payload to continuationTransfer as 
AMQPConnection_1_0Impl#sendFrame()
+                    // takes payload as an argument
 
                     _nextOutgoingId.incr();
                     _remoteIncomingWindow--;
 
-                    remaining = (long) payload.remaining();
+                    remaining = payload.remaining();
                     payloadSent = _connection.sendFrame(_sendingChannel, 
continuationTransfer, payload);
-
-                    continuationTransfer.dispose();
                 }
             }
         }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 1ceb1dd7e1..0e8bf0a8f4 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -81,7 +81,7 @@ public class StandardReceivingLinkEndpoint extends 
AbstractReceivingLinkEndpoint
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
     private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
-    private static final Accepted ACCEPTED = new Accepted();
+    private static final Accepted ACCEPTED = Accepted.INSTANCE;
     private static final String LINK = "link";
 
     private final java.util.Queue<AsyncCommand> _unfinishedCommandsQueue = new 
ConcurrentLinkedQueue<>();
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
index 7940c8d907..533322254a 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
@@ -34,7 +34,7 @@ import 
org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 
 public class StandardSendingDestination implements SendingDestination
 {
-    private static final Accepted ACCEPTED = new Accepted();
+    private static final Accepted ACCEPTED = Accepted.INSTANCE;
     private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
 
 
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index c87b00a389..c57ed309f9 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -119,7 +119,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends 
AbstractReceivingLinkEn
                             final DeliveryState outcome;
                             if (error == null)
                             {
-                                outcome = new Accepted();
+                                outcome = Accepted.INSTANCE;
                             }
                             else if 
(CollectionUtils.nullSafeList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL))
                             {
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
index b0f8ed08a2..ab0b5815e6 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
@@ -30,7 +30,6 @@ public class FrameWriter
 {
     private final ByteBufferSender _sender;
     private final ValueWriter.Registry _registry;
-    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
 
     public FrameWriter(final ValueWriter.Registry registry, final 
ByteBufferSender sender)
     {
@@ -45,7 +44,18 @@ public class FrameWriter
         final int payloadLength = payload == null ? 0 : payload.remaining();
         final T frameBody = frame.getFrameBody();
 
-        final ValueWriter<T> typeWriter = frameBody == null ? null : 
_registry.getValueWriter(frameBody);
+        final ValueWriter<T> typeWriter;
+
+        if (frameBody == null)
+        {
+            typeWriter = null;
+        }
+        else
+        {
+            final ValueWriter<T> cachedWriter = frame.getFrameBodyWriter();
+            typeWriter = cachedWriter == null ? 
_registry.getValueWriter(frameBody) : cachedWriter;
+        }
+
         int bodySize;
         if (typeWriter == null)
         {
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
index 0aa3a177c9..f83f7dceb7 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
@@ -25,6 +25,7 @@ import 
org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
 
+/** {@link ValueHandler} is thread safe */
 public class ValueHandler implements DescribedTypeConstructorRegistry.Source
 {
     public static final byte DESCRIBED_TYPE = (byte)0;
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/constants/Bytes.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/constants/Bytes.java
new file mode 100644
index 0000000000..53034b827f
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/constants/Bytes.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.constants;
+
+/** Utility class holding frequently used byte arrays */
+public final class Bytes
+{
+    private Bytes()
+    {
+        // constructor is private for utility class
+    }
+
+    /** AMQP header byte array representation  */
+    private static final byte[] AMQP_HEADER_BYTES = { (byte) 'A', (byte) 'M', 
(byte) 'Q', (byte) 'P',
+            (byte) 0, (byte) 1, (byte) 0, (byte) 0 };
+
+    /** Empty byte array  */
+    public static final byte[] EMPTY_BYTE_ARRAY = { };
+
+    /** SASL header byte array representation  */
+    private static final byte[] SASL_HEADER_BYTES = { (byte) 'A', (byte) 'M', 
(byte) 'Q', (byte) 'P',
+            (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
+    /**
+     * Returns a AMQP header shared array instance to avoid per-call 
allocation.
+     * The returned array must be treated as immutable and must not be 
modified by callers.
+     * @return AMQP header byte array
+     */
+    public static byte[] amqpHeader()
+    {
+        return AMQP_HEADER_BYTES;
+    }
+
+    /**
+     * Returns a SASL header shared array instance to avoid per-call 
allocation.
+     * The returned array must be treated as immutable and must not be 
modified by callers.
+     * @return SASL header byte array
+     */
+    public static byte[] saslHeader()
+    {
+        return SASL_HEADER_BYTES;
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
index eeec261df4..5e60054269 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
@@ -22,21 +22,60 @@
 package org.apache.qpid.server.protocol.v1_0.framing;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
 
 public abstract class AMQFrame<T>
 {
     private final T _frameBody;
     private final QpidByteBuffer _payload;
+    private final ValueWriter<T> _frameBodyWriter;
 
+    /**
+     * Base AMQP frame.
+     * @param frameBody frame body (may be null for heartbeat-like frames)
+     */
     AMQFrame(T frameBody)
     {
-        this(frameBody, null);
+        this(frameBody, null, null);
     }
 
+    /**
+     * Base AMQP frame.
+     * <br>
+     * {@code payload} (if present) is sent as-is using its current {@code 
position/limit}.
+     * Do not mutate the payload buffer concurrently. Ensure it remains 
valid/alive until the frame
+     * has been written to the transport.
+     * <br>
+     * @param frameBody frame body (may be null for heartbeat-like frames)
+     * @param payload optional payload; remaining bytes will be written
+     */
     protected AMQFrame(T frameBody, QpidByteBuffer payload)
+    {
+        this(frameBody, payload, null);
+    }
+
+    /**
+     * Base AMQP frame.
+     * <br>
+     * IMPORTANT: Frames are expected to be effectively immutable after 
construction.
+     * {@link FrameWriter} may use a cached {@link ValueWriter} supplied at 
construction time and will
+     * not re-resolve it from the registry. If {@code frameBody} is mutated 
after the writer is cached,
+     * the encoded size / encoding may become inconsistent.
+     *<br>
+     * {@code payload} (if present) is sent as-is using its current {@code 
position/limit}.
+     * Do not mutate the payload buffer concurrently. Ensure it remains 
valid/alive until the frame
+     * has been written to the transport.
+     * <br>
+     * @param frameBody frame body (may be null for heartbeat-like frames)
+     * @param payload optional payload; remaining bytes will be written
+     * @param frameBodyWriter optional cache hint; must match {@code 
frameBody} in its current state
+     */
+    protected AMQFrame(T frameBody, QpidByteBuffer payload, ValueWriter<T> 
frameBodyWriter)
     {
         _frameBody = frameBody;
         _payload = payload;
+        _frameBodyWriter = frameBodyWriter;
     }
 
     public QpidByteBuffer getPayload()
@@ -53,6 +92,11 @@ public abstract class AMQFrame<T>
         return _frameBody;
     }
 
+    public ValueWriter<T> getFrameBodyWriter()
+    {
+        return _frameBodyWriter;
+    }
+
     @Override
     public String toString()
     {
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
index 3156cbdbc7..1bd2b86d3f 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
@@ -19,6 +19,8 @@
 package org.apache.qpid.server.protocol.v1_0.framing;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 
 public final class TransportFrame extends AMQFrame<FrameBody>
@@ -27,18 +29,60 @@ public final class TransportFrame extends 
AMQFrame<FrameBody>
 
     private final int _channel;
 
+    /**
+     * Base Transport frame.
+     * @param channel channel
+     * @param frameBody frame body (may be null for heartbeat-like frames)
+     */
     public TransportFrame(int channel, FrameBody frameBody)
     {
         super(frameBody);
         _channel = channel;
     }
 
+    /**
+     * Base Transport frame.
+     * @param channel channel
+     * @param frameBody frame body (may be null for heartbeat-like frames)
+     * @param frameBodyWriter  optional cached writer for {@code frameBody}.
+     * Use only when the writer was resolved for this exact instance/state of 
{@code frameBody}
+     * and the body will not be mutated afterwards; otherwise pass null and 
let {@link FrameWriter}
+     * resolve the writer from the registry.
+     */
+    public TransportFrame(int channel, FrameBody frameBody, 
ValueWriter<FrameBody> frameBodyWriter)
+    {
+        super(frameBody, null, frameBodyWriter);
+        _channel = channel;
+    }
+
+    /**
+     * Base Transport frame.
+     * @param channel channel
+     * @param frameBody frame body (may be null for heartbeat-like frames)
+     * @param payload optional payload; remaining bytes will be written
+     */
     public TransportFrame(int channel, FrameBody frameBody, QpidByteBuffer 
payload)
     {
         super(frameBody, payload);
         _channel = channel;
     }
 
+    /**
+     * Base Transport frame.
+     * @param channel channel
+     * @param frameBody frame body (may be null for heartbeat-like frames)
+     * @param payload optional payload; remaining bytes will be written
+     * @param frameBodyWriter  optional cached writer for {@code frameBody}.
+     * Use only when the writer was resolved for this exact instance/state of 
{@code frameBody}
+     * and the body will not be mutated afterwards; otherwise pass null and 
let {@link FrameWriter}
+     * resolve the writer from the registry.
+     */
+    public TransportFrame(int channel, FrameBody frameBody, QpidByteBuffer 
payload, ValueWriter<FrameBody> frameBodyWriter)
+    {
+        super(frameBody, payload, frameBodyWriter);
+        _channel = channel;
+    }
+
     @Override public int getChannel()
     {
         return _channel;
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
index 00fde2ea70..9eb7964619 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import 
org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 
+/** {@link SectionDecoderImpl} is thread safe */
 public class SectionDecoderImpl implements SectionDecoder
 {
 
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
index 4efa3e1ae3..fa998bc526 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
@@ -40,6 +40,20 @@ public class Binary
         _hashCode = hc;
     }
 
+    public static Binary ofDeliveryTag(final long value)
+    {
+        final byte[] data = new byte[8];
+        data[0] = (byte) (value >>> 56);
+        data[1] = (byte) (value >>> 48);
+        data[2] = (byte) (value >>> 40);
+        data[3] = (byte) (value >>> 32);
+        data[4] = (byte) (value >>> 24);
+        data[5] = (byte) (value >>> 16);
+        data[6] = (byte) (value >>> 8);
+        data[7] = (byte) value;
+        return new Binary(data);
+    }
+
     public ByteBuffer asByteBuffer()
     {
         return ByteBuffer.wrap(_data);
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
index 0e4d71f500..0848ff7b3e 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
@@ -69,8 +69,9 @@ import 
org.apache.qpid.server.protocol.v1_0.type.transport.codec.*;
 
 public class AMQPDescribedTypeRegistry implements 
DescribedTypeConstructorRegistry, ValueWriter.Registry
 {
-
+    /** Is initialized once during startup, treated as read-only afterward */
     private final Map<Object, DescribedTypeConstructor> _constructorRegistry = 
new HashMap<>();
+    /** Is initialized once during startup, treated as read-only afterward */
     private final Map<Object, DescribedTypeConstructor> 
_sectionDecoderRegistryMap = new HashMap<>();
 
 
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
index 5f98113e79..3a02290c80 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 public class Accepted implements Outcome
 {
     public static final Symbol ACCEPTED_SYMBOL = 
Symbol.valueOf("amqp:accepted:list");
+    public static final Accepted INSTANCE = new Accepted();
 
     @Override
     public Symbol getSymbol()
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
index 26a83e1440..b50ccc3d8a 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
@@ -45,7 +45,7 @@ public class AmqpValue implements 
NonEncodingRetainingSection<Object>
     @Override
     public String toString()
     {
-        return "AmqpValue{(" + _value.getClass().getName() + ')' + _value + 
'}';
+        return "AmqpValue{(" + (_value == null ? "null" : 
_value.getClass().getName()) + ')' + _value + '}';
     }
 
 
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
index e4d3231d23..5925b44d5b 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
@@ -295,6 +295,12 @@ public class Transfer implements FrameBody
         conn.receiveTransfer(channel, this);
     }
 
+    public int getPayloadRemaining()
+    {
+        final QpidByteBuffer payload = _payload;
+        return payload == null ? 0 : payload.remaining();
+    }
+
     public QpidByteBuffer getPayload()
     {
         if (_payload == null)
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
index 7aafaf011e..41df4e3682 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
@@ -20,7 +20,11 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -36,7 +40,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.junit.jupiter.api.BeforeAll;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -75,7 +83,7 @@ class ConsumerTarget_1_0Test extends UnitTestBase
     private ConsumerTarget_1_0 _consumerTarget;
     private SendingLinkEndpoint _sendingLinkEndpoint;
 
-    @BeforeAll
+    @BeforeEach
     void setUp()
     {
         final AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
@@ -110,9 +118,7 @@ class ConsumerTarget_1_0Test extends UnitTestBase
         {
             final Object[] args = invocation.getArguments();
             final Transfer transfer = (Transfer) args[0];
-
             final QpidByteBuffer transferPayload = transfer.getPayload();
-
             final QpidByteBuffer payloadCopy = transferPayload.duplicate();
             payloadRef.set(payloadCopy);
             return null;
@@ -137,8 +143,8 @@ class ConsumerTarget_1_0Test extends UnitTestBase
         }
 
         assertNotNull(sentHeader, "Header is not found");
-        assertNotNull(sentHeader.getTtl(), "Ttl is not set");
-        assertTrue(sentHeader.getTtl().longValue() <= 1000, "Unexpected ttl");
+        assertNotNull(sentHeader.getTtl(), "TTL is not set");
+        assertTrue(sentHeader.getTtl().longValue() <= 1000, "Unexpected TTL");
     }
 
     private Message_1_0 createTestMessage(final Header header, long 
arrivalTime)
@@ -164,4 +170,253 @@ class ConsumerTarget_1_0Test extends UnitTestBase
         when(storedMessage.getMetaData()).thenReturn(metaData);
         return new Message_1_0(storedMessage);
     }
+
+    @Test
+    void bodyOnlyMessageIsSentAsSingleDataSection() throws Exception
+    {
+        final MessageInstanceConsumer consumer = 
mock(MessageInstanceConsumer.class);
+        final byte[] body = new byte[]{10, 11, 12};
+        final Message_1_0 message = createBodyOnlyMessage(body);
+
+        final MessageInstance messageInstance = mock(MessageInstance.class);
+        when(messageInstance.getMessage()).thenReturn(message);
+        when(messageInstance.getDeliveryCount()).thenReturn(0);
+
+        final AtomicReference<QpidByteBuffer> payloadRef = new 
AtomicReference<>();
+        doAnswer(invocation ->
+        {
+            final Transfer transfer = (Transfer) invocation.getArguments()[0];
+            try (QpidByteBuffer transferPayload = transfer.getPayload())
+            {
+                payloadRef.set(transferPayload == null ? null : 
transferPayload.duplicate());
+            }
+            return null;
+        }).when(_sendingLinkEndpoint).transfer(any(Transfer.class), 
anyBoolean());
+
+        _consumerTarget.doSend(consumer, messageInstance, false);
+
+        try (final QpidByteBuffer payload = payloadRef.get())
+        {
+            final List<EncodingRetainingSection<?>> sections =
+                    new 
SectionDecoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry()).parseAll(payload);
+
+            assertEquals(1, sections.size());
+            assertInstanceOf(DataSection.class, sections.get(0));
+            final DataSection dataSection = (DataSection) sections.get(0);
+            assertArrayEquals(body, dataSection.getValue().getArray());
+            sections.forEach(EncodingRetainingSection::dispose);
+        }
+    }
+
+    @Test
+    void headerAndBodyOnlyMessageIncludesHeaderSection() throws Exception
+    {
+        final MessageInstanceConsumer consumer = 
mock(MessageInstanceConsumer.class);
+        final byte[] body = new byte[]{1, 2, 3, 4};
+
+        final Header header = new Header();
+        header.setDurable(true);
+        header.setPriority(UnsignedByte.valueOf((byte) 7));
+
+        final Message_1_0 message = createHeaderAndBodyOnlyMessage(header, 
body, System.currentTimeMillis());
+
+        final MessageInstance messageInstance = mock(MessageInstance.class);
+        when(messageInstance.getMessage()).thenReturn(message);
+        when(messageInstance.getDeliveryCount()).thenReturn(0);
+
+        final AtomicReference<QpidByteBuffer> payloadRef = new 
AtomicReference<>();
+        doAnswer(invocation ->
+        {
+            final Transfer transfer = (Transfer) invocation.getArguments()[0];
+            try (QpidByteBuffer transferPayload = transfer.getPayload())
+            {
+                payloadRef.set(transferPayload == null ? null : 
transferPayload.duplicate());
+            }
+            return null;
+        }).when(_sendingLinkEndpoint).transfer(any(Transfer.class), 
anyBoolean());
+
+        _consumerTarget.doSend(consumer, messageInstance, false);
+
+        try (final QpidByteBuffer payload = payloadRef.get())
+        {
+            final List<EncodingRetainingSection<?>> sections =
+                    new 
SectionDecoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry()).parseAll(payload);
+
+            assertEquals(2, sections.size(), "Unexpected number of sections");
+
+            Header sentHeader = null;
+            Binary sentBody = null;
+            for (final EncodingRetainingSection<?> section : sections)
+            {
+                if (section instanceof HeaderSection)
+                {
+                    sentHeader = ((HeaderSection) section).getValue();
+                }
+                else if (section instanceof DataSection)
+                {
+                    sentBody = ((DataSection) section).getValue();
+                }
+            }
+
+            assertNotNull(sentHeader, "Header is not found");
+            assertEquals(Boolean.TRUE, sentHeader.getDurable());
+            assertEquals(UnsignedInteger.valueOf(7).intValue(), 
sentHeader.getPriority().intValue());
+            assertNotNull(sentBody, "Body is not found");
+            assertArrayEquals(body, sentBody.getArray());
+
+            sections.forEach(EncodingRetainingSection::dispose);
+        }
+    }
+
+    @Test
+    void deliveryCountAndTtlAreAppliedForHeaderAndBodyOnlyMessage() throws 
Exception
+    {
+        final MessageInstanceConsumer consumer = 
mock(MessageInstanceConsumer.class);
+        final byte[] body = new byte[]{10, 20, 30};
+
+        final long ttl = 2000L;
+        final long arrivalTime = System.currentTimeMillis() - 1000L;
+
+        final Header header = new Header();
+        header.setTtl(UnsignedInteger.valueOf(ttl));
+
+        final Message_1_0 message = createHeaderAndBodyOnlyMessage(header, 
body, arrivalTime);
+
+        final int deliveryCount = 5;
+        final MessageInstance messageInstance = mock(MessageInstance.class);
+        when(messageInstance.getMessage()).thenReturn(message);
+        when(messageInstance.getDeliveryCount()).thenReturn(deliveryCount);
+
+        final AtomicReference<QpidByteBuffer> payloadRef = new 
AtomicReference<>();
+        doAnswer(invocation ->
+        {
+            final Transfer transfer = (Transfer) invocation.getArguments()[0];
+            try (QpidByteBuffer transferPayload = transfer.getPayload())
+            {
+                payloadRef.set(transferPayload == null ? null : 
transferPayload.duplicate());
+            }
+            return null;
+        }).when(_sendingLinkEndpoint).transfer(any(Transfer.class), 
anyBoolean());
+
+        _consumerTarget.doSend(consumer, messageInstance, false);
+
+        final List<EncodingRetainingSection<?>> sections;
+        try (final QpidByteBuffer payload = payloadRef.get())
+        {
+            sections = new 
SectionDecoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry()).parseAll(payload);
+        }
+
+        Header sentHeader = null;
+        Binary sentBody = null;
+        for (final EncodingRetainingSection<?> section : sections)
+        {
+            if (section instanceof HeaderSection)
+            {
+                sentHeader = ((HeaderSection) section).getValue();
+            }
+            else if (section instanceof DataSection)
+            {
+                sentBody = ((DataSection) section).getValue();
+            }
+        }
+
+        assertNotNull(sentHeader, "Header is not found");
+        assertEquals(UnsignedInteger.valueOf(deliveryCount), 
sentHeader.getDeliveryCount(), "Delivery count not set");
+        assertNotNull(sentHeader.getTtl(), "TTL is not set");
+        assertTrue(sentHeader.getTtl().longValue() <= 1000, "Unexpected TTL");
+
+        assertNotNull(sentBody, "Body is not found");
+        assertArrayEquals(body, sentBody.getArray());
+
+        sections.forEach(EncodingRetainingSection::dispose);
+    }
+
+    @Test
+    void nullBodyContentDoesNotSetPayloadAndDoesNotThrow()
+    {
+        final MessageInstanceConsumer consumer = 
mock(MessageInstanceConsumer.class);
+        final Message_1_0 message = mock(Message_1_0.class);
+        when(message.getContent()).thenReturn(null);
+        when(message.getHeaderSection()).thenReturn(null);
+        when(message.getDeliveryAnnotationsSection()).thenReturn(null);
+        when(message.getMessageAnnotationsSection()).thenReturn(null);
+        when(message.getPropertiesSection()).thenReturn(null);
+        when(message.getApplicationPropertiesSection()).thenReturn(null);
+        when(message.getFooterSection()).thenReturn(null);
+        when(message.getSize()).thenReturn(0L);
+
+        final MessageInstance messageInstance = mock(MessageInstance.class);
+        when(messageInstance.getMessage()).thenReturn(message);
+        when(messageInstance.getDeliveryCount()).thenReturn(0);
+
+        final AtomicReference<QpidByteBuffer> payloadRef = new 
AtomicReference<>();
+        doAnswer(invocation ->
+        {
+            final Transfer transfer = (Transfer) invocation.getArguments()[0];
+            payloadRef.set(transfer.getPayload());
+            return null;
+        }).when(_sendingLinkEndpoint).transfer(any(Transfer.class), 
anyBoolean());
+
+        _consumerTarget.doSend(consumer, messageInstance, false);
+
+        assertNull(payloadRef.get(), "Payload must remain null when body 
content is null");
+    }
+
+    private Message_1_0 createHeaderAndBodyOnlyMessage(final Header header, 
final byte[] body, final long arrivalTime)
+    {
+        final DataSection dataSection = new Data(new 
Binary(body)).createEncodingRetainingSection();
+        final byte[] encodedBody;
+        try (final QpidByteBuffer encoded = dataSection.getEncodedForm())
+        {
+            encodedBody = new byte[encoded.remaining()];
+            encoded.copyTo(encodedBody);
+        }
+        finally
+        {
+            dataSection.dispose();
+        }
+
+        final StoredMessage<MessageMetaData_1_0> storedMessage = 
mock(StoredMessage.class);
+        when(storedMessage.getMetaData()).thenReturn(new MessageMetaData_1_0(
+                header.createEncodingRetainingSection(), null, null, null, 
null, null, arrivalTime, encodedBody.length));
+        when(storedMessage.getContentSize()).thenReturn(encodedBody.length);
+        when(storedMessage.isInContentInMemory()).thenReturn(true);
+        when(storedMessage.getContent(anyInt(), anyInt())).thenAnswer(inv ->
+        {
+            final int offset = inv.getArgument(0);
+            final int length = inv.getArgument(1);
+            return QpidByteBuffer.wrap(encodedBody, offset, length);
+        });
+
+        return new Message_1_0(storedMessage);
+    }
+
+    private Message_1_0 createBodyOnlyMessage(final byte[] body)
+    {
+        final DataSection dataSection = new Data(new 
Binary(body)).createEncodingRetainingSection();
+        final byte[] encodedBody;
+        try (final QpidByteBuffer encoded = dataSection.getEncodedForm())
+        {
+            encodedBody = new byte[encoded.remaining()];
+            encoded.copyTo(encodedBody);
+        }
+        finally
+        {
+            dataSection.dispose();
+        }
+
+        final StoredMessage<MessageMetaData_1_0> storedMessage = 
mock(StoredMessage.class);
+        when(storedMessage.getMetaData()).thenReturn(new MessageMetaData_1_0(
+                null, null, null, null, null, null, 
System.currentTimeMillis(), encodedBody.length));
+        when(storedMessage.getContentSize()).thenReturn(encodedBody.length);
+        when(storedMessage.isInContentInMemory()).thenReturn(true);
+        when(storedMessage.getContent(anyInt(), anyInt())).thenAnswer(inv ->
+        {
+            final int offset = inv.getArgument(0);
+            final int length = inv.getArgument(1);
+            return QpidByteBuffer.wrap(encodedBody, offset, length);
+        });
+
+        return new Message_1_0(storedMessage);
+    }
 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/DeliveryTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/DeliveryTest.java
new file mode 100644
index 0000000000..ec77ddc039
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/DeliveryTest.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+class DeliveryTest extends UnitTestBase
+{
+    @Test
+    void singleTransferPayloadIsReturnedAndSizeIsCounted()
+    {
+        final byte[] bytes = new byte[]{1, 2, 3, 4};
+        final Transfer t = new Transfer();
+        t.setDeliveryId(UnsignedInteger.valueOf(1));
+        t.setDeliveryTag(new Binary(new byte[]{0x01}));
+
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+        {
+            t.setPayload(buf);
+        }
+
+        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
endpoint = mock(LinkEndpoint.class);
+        final Delivery delivery = new Delivery(t, endpoint);
+
+        assertEquals(4L, delivery.getTotalPayloadSize());
+
+        try (final QpidByteBuffer payload = delivery.getPayload())
+        {
+            final byte[] actual = new byte[payload.remaining()];
+            payload.copyTo(actual);
+            assertArrayEquals(bytes, actual);
+        }
+    }
+
+    @Test
+    void singleTransferWithNullPayloadReturnsEmptyBuffer()
+    {
+        final Transfer t = new Transfer();
+        t.setDeliveryId(UnsignedInteger.valueOf(1));
+        t.setDeliveryTag(new Binary(new byte[]{0x01}));
+
+        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
endpoint = mock(LinkEndpoint.class);
+        final Delivery delivery = new Delivery(t, endpoint);
+
+        assertEquals(0L, delivery.getTotalPayloadSize());
+
+        try (final QpidByteBuffer payload = delivery.getPayload())
+        {
+            assertEquals(0, payload.remaining());
+        }
+    }
+
+    @Test
+    void multipleTransfersAreConcatenatedAndSizeIsSum()
+    {
+        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
endpoint = mock(LinkEndpoint.class);
+
+        final Transfer t1 = new Transfer();
+        t1.setDeliveryId(UnsignedInteger.valueOf(1));
+        t1.setDeliveryTag(new Binary(new byte[]{0x01}));
+        t1.setMore(true);
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[]{1, 2}))
+        {
+            t1.setPayload(buf);
+        }
+
+        final Delivery delivery = new Delivery(t1, endpoint);
+
+        final Transfer t2 = new Transfer();
+        t2.setMore(false);
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[]{3, 4, 
5}))
+        {
+            t2.setPayload(buf);
+        }
+
+        delivery.addTransfer(t2);
+
+        assertEquals(5L, delivery.getTotalPayloadSize());
+
+        try (final QpidByteBuffer payload = delivery.getPayload())
+        {
+            final byte[] actual = new byte[payload.remaining()];
+            payload.copyTo(actual);
+            assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, actual);
+        }
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0Test.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0Test.java
new file mode 100644
index 0000000000..8f08b8cc9d
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0Test.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.model.Protocol;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class ProtocolEngineCreator_1_0_0Test
+{
+    private static final byte[] ORIGINAL_AMQP_HEADER = { (byte) 'A', (byte) 
'M', (byte) 'Q', (byte) 'P',
+            (byte) 0, (byte) 1, (byte) 0, (byte) 0 };
+
+    private static final byte[] ORIGINAL_SASL_HEADER = { (byte) 'A', (byte) 
'M', (byte) 'Q', (byte) 'P',
+            (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
+    @Test
+    void getHeaderIdentifier()
+    {
+        assertArrayEquals(ORIGINAL_AMQP_HEADER, 
ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier(),
+                "AMQP header should not be changed");
+    }
+
+    @Test
+    void getSuggestedAlternativeHeader()
+    {
+        assertArrayEquals(ORIGINAL_SASL_HEADER, 
ProtocolEngineCreator_1_0_0.getInstance().getSuggestedAlternativeHeader(),
+                "SASL header should not be changed");
+    }
+
+    @Test
+    void getVersion()
+    {
+        assertEquals(Protocol.AMQP_1_0, 
ProtocolEngineCreator_1_0_0.getInstance().getVersion(),
+                "Protocol version should be AMQP_1_0");
+    }
+
+    @Test
+    void getType()
+    {
+        assertEquals("AMQP_1_0", 
ProtocolEngineCreator_1_0_0.getInstance().getType(),
+                "Protocol version should be AMQP_1_0");
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java
new file mode 100644
index 0000000000..d6174b5e04
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.model.Protocol;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class ProtocolEngineCreator_1_0_0_SASLTest
+{
+    private static final byte[] ORIGINAL_SASL_HEADER = { (byte) 'A', (byte) 
'M', (byte) 'Q', (byte) 'P',
+            (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
+    @Test
+    void getHeaderIdentifier()
+    {
+        assertArrayEquals(ORIGINAL_SASL_HEADER, 
ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier(),
+                "AMQP header should not be changed");
+    }
+
+    @Test
+    void getSuggestedAlternativeHeader()
+    {
+        
assertNull(ProtocolEngineCreator_1_0_0_SASL.getInstance().getSuggestedAlternativeHeader(),
+                "No suggested alternative header expected");
+    }
+
+    @Test
+    void getVersion()
+    {
+        assertEquals(Protocol.AMQP_1_0, 
ProtocolEngineCreator_1_0_0_SASL.getInstance().getVersion(),
+                "Protocol version should be AMQP_1_0");
+    }
+
+    @Test
+    void getType()
+    {
+        assertEquals("AMQP_1_0", 
ProtocolEngineCreator_1_0_0_SASL.getInstance().getType(),
+                "Protocol version should be AMQP_1_0");
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index f51f4b82b0..1301e812f4 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
@@ -40,6 +41,7 @@ import java.util.UUID;
 
 import javax.security.auth.Subject;
 
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -82,6 +84,12 @@ import org.apache.qpid.test.utils.UnitTestBase;
 @SuppressWarnings({"rawtypes"})
 class ProtocolEngine_1_0_0Test extends UnitTestBase
 {
+    private static final byte[] ORIGINAL_AMQP_HEADER = { (byte) 'A', (byte) 
'M', (byte) 'Q', (byte) 'P',
+            (byte) 0, (byte) 1, (byte) 0, (byte) 0 };
+
+    private static final byte[] ORIGINAL_SASL_HEADER = { (byte) 'A', (byte) 
'M', (byte) 'Q', (byte) 'P',
+            (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
     private AMQPConnection_1_0Impl _protocolEngine_1_0_0;
     private ServerNetworkConnection _networkConnection;
     private Broker<?> _broker;
@@ -197,6 +205,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
         final AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal, new 
AuthenticatedPrincipal(anonymousAuthenticationManager.getAnonymousPrincipal()));
+
+        assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP 
header should not be changed");
+        assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL 
header should not be changed");
     }
 
     @Test
@@ -214,6 +225,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
 
         verify(_virtualHost, 
never()).registerConnection(any(AMQPConnection.class));
         verify(_networkConnection).close();
+
+        assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP 
header should not be changed");
+        assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL 
header should not be changed");
     }
 
     @Test
@@ -235,6 +249,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
         final AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
         assertNotNull(authPrincipal);
         assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
+
+        assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP 
header should not be changed");
+        assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL 
header should not be changed");
     }
 
     @Test
@@ -269,6 +286,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
         final AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal, new 
AuthenticatedPrincipal(anonymousAuthenticationManager.getAnonymousPrincipal()));
+
+        assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP 
header should not be changed");
+        assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL 
header should not be changed");
     }
 
     private void createEngine(final Transport transport)
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index de7fa0e8c2..5e9f23048c 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -22,15 +22,22 @@ package org.apache.qpid.server.protocol.v1_0;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -49,6 +56,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.stubbing.Answer;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.filter.AMQPFilterTypes;
@@ -64,12 +72,14 @@ import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -80,7 +90,9 @@ import 
org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.queue.QueueConsumer;
 import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -557,6 +569,288 @@ class Session_1_0Test extends UnitTestBase
         assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), true);
     }
 
+    @Test
+    void sendTransferDoesNotSetPayloadOnContinuationTransfers()
+    {
+        // Arrange: payload 25 bytes -> 3 sendFrame calls if "wire" writes 10 
bytes per call
+        final byte[] bytes = createSequentialBytes(25);
+
+        final Transfer xfr = new Transfer();
+        xfr.setSettled(true);
+        xfr.setHandle(UnsignedInteger.valueOf(0));
+        xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+        {
+            xfr.setPayload(buf);
+        }
+
+        when(_connection.sendFrame(eq(0), any(), 
any(QpidByteBuffer.class))).thenAnswer(inv ->
+        {
+            final QpidByteBuffer payload = inv.getArgument(2);
+            if (payload == null)
+            {
+                return 0;
+            }
+            final int chunk = Math.min(10, payload.remaining());
+            payload.position(payload.position() + chunk);
+            return chunk;
+        });
+
+        final ArgumentCaptor<FrameBody> bodyCaptor = 
ArgumentCaptor.forClass(FrameBody.class);
+        final SendingLinkEndpoint endpoint = mock(SendingLinkEndpoint.class);
+
+        _session.sendTransfer(xfr, endpoint);
+
+        verify(_connection, times(3)).sendFrame(eq(0), bodyCaptor.capture(), 
any(QpidByteBuffer.class));
+        final List<FrameBody> bodies = bodyCaptor.getAllValues();
+        assertInstanceOf(Transfer.class, bodies.get(0));
+        assertNull(((Transfer) bodies.get(1)).getPayload(), "Continuation 
transfer must not carry payload");
+        assertNull(((Transfer) bodies.get(2)).getPayload(), "Continuation 
transfer must not carry payload");
+
+        xfr.dispose();
+    }
+
+    @Test
+    void sendTransferChunksPayloadPreservesByteOrderAndSetsMoreFlag()
+    {
+        // Arrange: payload 25 bytes -> 3 sendFrame calls if the transport 
writes 10 bytes per call
+        final byte[] bytes = createSequentialBytes(25);
+
+        final Transfer xfr = new Transfer();
+        xfr.setSettled(true);
+        xfr.setHandle(UnsignedInteger.valueOf(0));
+        xfr.setRcvSettleMode(ReceiverSettleMode.FIRST);
+        xfr.setState(Accepted.INSTANCE);
+        xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+        {
+            xfr.setPayload(buf);
+        }
+
+        final ByteArrayOutputStream capturedPayload = 
stubSendFrameToWriteAtMostAndCapturePayload(10);
+        final ArgumentCaptor<FrameBody> bodyCaptor = 
ArgumentCaptor.forClass(FrameBody.class);
+        final SendingLinkEndpoint endpoint = mock(SendingLinkEndpoint.class);
+
+        // Act
+        _session.sendTransfer(xfr, endpoint);
+
+        // Assert
+        verify(_connection, times(3)).sendFrame(eq(0), bodyCaptor.capture(), 
nullable(QpidByteBuffer.class));
+        assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes 
were not preserved across chunks");
+
+        final List<FrameBody> bodies = bodyCaptor.getAllValues();
+        assertEquals(3, bodies.size(), "Unexpected number of frames");
+
+        assertSame(xfr, bodies.get(0), "First frame body should be the 
original Transfer");
+        assertNotSame(xfr, bodies.get(1), "Continuation frame should use a new 
Transfer instance");
+        assertNotSame(xfr, bodies.get(2), "Continuation frame should use a new 
Transfer instance");
+
+        final Transfer first = (Transfer) bodies.get(0);
+        final Transfer second = (Transfer) bodies.get(1);
+        final Transfer third = (Transfer) bodies.get(2);
+
+        assertEquals(Boolean.TRUE, first.getMore(), "First transfer should 
have more=true when payload is chunked");
+        assertEquals(Boolean.TRUE, second.getMore(), "Intermediate 
continuation transfer should have more=true");
+        assertNotEquals(Boolean.TRUE, third.getMore(), "Last transfer must not 
have more=true");
+
+        assertNull(second.getPayload(), "Continuation transfer must not carry 
payload");
+        assertNull(third.getPayload(), "Continuation transfer must not carry 
payload");
+
+        // Continuations should preserve key delivery settings
+        assertEquals(first.getHandle(), second.getHandle(), "Handle must be 
preserved on continuation transfer");
+        assertEquals(first.getHandle(), third.getHandle(), "Handle must be 
preserved on continuation transfer");
+        assertEquals(first.getRcvSettleMode(), second.getRcvSettleMode(), 
"RcvSettleMode must be preserved");
+        assertEquals(first.getRcvSettleMode(), third.getRcvSettleMode(), 
"RcvSettleMode must be preserved");
+        assertEquals(first.getState(), second.getState(), "State must be 
preserved");
+        assertEquals(first.getState(), third.getState(), "State must be 
preserved");
+
+        // Delivery-id is assigned only to the first transfer
+        assertNotNull(first.getDeliveryId(), "DeliveryId should be assigned to 
the first transfer");
+        assertNull(second.getDeliveryId(), "Continuation transfer must not set 
delivery-id");
+        assertNull(third.getDeliveryId(), "Continuation transfer must not set 
delivery-id");
+
+        xfr.dispose();
+    }
+
+    @Test
+    void sendTransferSendsSingleFrameWhenPayloadFitsWithinChunk()
+    {
+        final byte[] bytes = createSequentialBytes(9);
+
+        final Transfer xfr = new Transfer();
+        xfr.setSettled(true);
+        xfr.setHandle(UnsignedInteger.valueOf(0));
+        xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+        {
+            xfr.setPayload(buf);
+        }
+
+        final ByteArrayOutputStream capturedPayload = 
stubSendFrameToWriteAtMostAndCapturePayload(10);
+        final ArgumentCaptor<FrameBody> bodyCaptor = 
ArgumentCaptor.forClass(FrameBody.class);
+
+        _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+        verify(_connection, times(1)).sendFrame(eq(0), bodyCaptor.capture(), 
nullable(QpidByteBuffer.class));
+        assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes 
were not written as a single chunk");
+
+        final Transfer body = (Transfer) bodyCaptor.getValue();
+        assertNotEquals(Boolean.TRUE, body.getMore(), "Single-frame transfer 
must not have more=true");
+
+        xfr.dispose();
+    }
+
+    @Test
+    void sendTransferSendsSingleFrameWhenPayloadExactlyEqualsChunk()
+    {
+        final byte[] bytes = createSequentialBytes(10);
+
+        final Transfer xfr = new Transfer();
+        xfr.setSettled(true);
+        xfr.setHandle(UnsignedInteger.valueOf(0));
+        xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+        {
+            xfr.setPayload(buf);
+        }
+
+        final ByteArrayOutputStream capturedPayload = 
stubSendFrameToWriteAtMostAndCapturePayload(10);
+        final ArgumentCaptor<FrameBody> bodyCaptor = 
ArgumentCaptor.forClass(FrameBody.class);
+
+        _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+        verify(_connection, times(1)).sendFrame(eq(0), bodyCaptor.capture(), 
nullable(QpidByteBuffer.class));
+        assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes 
were not written as a single chunk");
+
+        final Transfer body = (Transfer) bodyCaptor.getValue();
+        assertNotEquals(Boolean.TRUE, body.getMore(), "Single-frame transfer 
must not have more=true");
+
+        xfr.dispose();
+    }
+
+    @Test
+    void sendTransferSendsTwoFramesWhenPayloadIsOneByteOverChunk()
+    {
+        final byte[] bytes = createSequentialBytes(11);
+
+        final Transfer xfr = new Transfer();
+        xfr.setSettled(true);
+        xfr.setHandle(UnsignedInteger.valueOf(0));
+        xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+        {
+            xfr.setPayload(buf);
+        }
+
+        final ByteArrayOutputStream capturedPayload = 
stubSendFrameToWriteAtMostAndCapturePayload(10);
+        final ArgumentCaptor<FrameBody> bodyCaptor = 
ArgumentCaptor.forClass(FrameBody.class);
+
+        _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+        verify(_connection, times(2)).sendFrame(eq(0), bodyCaptor.capture(), 
nullable(QpidByteBuffer.class));
+        assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes 
were not preserved across two chunks");
+
+        final List<FrameBody> bodies = bodyCaptor.getAllValues();
+        final Transfer first = (Transfer) bodies.get(0);
+        final Transfer second = (Transfer) bodies.get(1);
+
+        assertEquals(Boolean.TRUE, first.getMore(), "First transfer should 
have more=true when payload is chunked");
+        assertNotEquals(Boolean.TRUE, second.getMore(), "Last transfer must 
not have more=true");
+        assertNull(second.getPayload(), "Continuation transfer must not carry 
payload");
+
+        xfr.dispose();
+    }
+
+    @Test
+    void sendTransferWithEmptyPayloadDoesNotSendContinuation()
+    {
+        final Transfer xfr = new Transfer();
+        xfr.setSettled(true);
+        xfr.setHandle(UnsignedInteger.valueOf(0));
+        xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+        try (final QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[0]))
+        {
+            xfr.setPayload(buf);
+        }
+
+        final ByteArrayOutputStream capturedPayload = 
stubSendFrameToWriteAtMostAndCapturePayload(10);
+        final ArgumentCaptor<FrameBody> bodyCaptor = 
ArgumentCaptor.forClass(FrameBody.class);
+
+        _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+        verify(_connection, times(1)).sendFrame(eq(0), bodyCaptor.capture(), 
nullable(QpidByteBuffer.class));
+        assertEquals(0, capturedPayload.size(), "Empty payload should not 
produce any bytes");
+
+        xfr.dispose();
+    }
+
+    @Test
+    void sendTransferWithNullPayloadDoesNotThrowAndDoesNotSendContinuation()
+    {
+        final Transfer xfr = new Transfer();
+        xfr.setSettled(true);
+        xfr.setHandle(UnsignedInteger.valueOf(0));
+        xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+        // No payload set
+
+        when(_connection.sendFrame(eq(0), any(), 
nullable(QpidByteBuffer.class))).thenReturn(0);
+        final ArgumentCaptor<QpidByteBuffer> payloadCaptor = 
ArgumentCaptor.forClass(QpidByteBuffer.class);
+
+        _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+        verify(_connection, times(1)).sendFrame(eq(0), any(), 
payloadCaptor.capture());
+        assertNull(payloadCaptor.getValue(), "Expected payload argument to be 
null");
+
+        xfr.dispose();
+    }
+
+    private static byte[] createSequentialBytes(final int length)
+    {
+        final byte[] bytes = new byte[length];
+        for (int i = 0; i < bytes.length; i++)
+        {
+            bytes[i] = (byte) i;
+        }
+        return bytes;
+    }
+
+    private ByteArrayOutputStream 
stubSendFrameToWriteAtMostAndCapturePayload(final int maxBytesPerCall)
+    {
+        final ByteArrayOutputStream captured = new ByteArrayOutputStream();
+
+        when(_connection.sendFrame(eq(0), any(), 
nullable(QpidByteBuffer.class))).thenAnswer(inv ->
+        {
+            final FrameBody body = inv.getArgument(1);
+            final QpidByteBuffer payload = inv.getArgument(2);
+            if (payload == null)
+            {
+                return 0;
+            }
+
+            final int remaining = payload.remaining();
+            final int chunk = Math.min(maxBytesPerCall, remaining);
+
+            if (body instanceof Transfer)
+            {
+                // Simulate the AMQP transport behaviour: set 'more' when not 
all payload is written.
+                ((Transfer) body).setMore(remaining > chunk);
+            }
+
+            // Copy the bytes without affecting the original payload buffer 
position.
+            try (final QpidByteBuffer dup = payload.duplicate())
+            {
+                final byte[] bytes = new byte[chunk];
+                dup.get(bytes);
+                captured.write(bytes);
+            }
+
+            payload.position(payload.position() + chunk);
+            return chunk;
+        });
+
+        return captured;
+    }
+
     private Source createDynamicSource(final DeleteOnClose lifetimePolicy)
     {
         final Source source = new Source();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to