This is an automated email from the ASF dual-hosted git repository.
vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push:
new b8e24e5b71 QPID-8013: [Broker-J] Reduce footprint of AMQP 1.0 protocol
objects (#369)
b8e24e5b71 is described below
commit b8e24e5b71049e6646231a35611e2b0413d975b8
Author: Daniil Kirilyuk <[email protected]>
AuthorDate: Wed Jan 28 12:24:29 2026 +0100
QPID-8013: [Broker-J] Reduce footprint of AMQP 1.0 protocol objects (#369)
---
.../v1_0/CompositeTypeConstructorGenerator.java | 10 +-
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 73 +++--
.../protocol/v1_0/AnonymousRelayDestination.java | 11 +-
.../server/protocol/v1_0/ConsumerTarget_1_0.java | 110 +++++---
.../apache/qpid/server/protocol/v1_0/Delivery.java | 23 +-
.../protocol/v1_0/ExchangeSendingDestination.java | 2 +-
.../v1_0/MessageConverter_Internal_to_v1_0.java | 26 +-
.../protocol/v1_0/MessageConverter_from_1_0.java | 13 +-
.../protocol/v1_0/MessageConverter_to_1_0.java | 30 ++-
.../server/protocol/v1_0/MessageMetaData_1_0.java | 5 +-
.../qpid/server/protocol/v1_0/Message_1_0.java | 6 +-
.../protocol/v1_0/NodeReceivingDestination.java | 27 +-
.../protocol/v1_0/ProtocolEngineCreator_1_0_0.java | 20 +-
.../v1_0/ProtocolEngineCreator_1_0_0_SASL.java | 24 +-
.../server/protocol/v1_0/SendingLinkEndpoint.java | 2 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 21 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 2 +-
.../protocol/v1_0/StandardSendingDestination.java | 2 +-
.../v1_0/TxnCoordinatorReceivingLinkEndpoint.java | 2 +-
.../server/protocol/v1_0/codec/FrameWriter.java | 14 +-
.../server/protocol/v1_0/codec/ValueHandler.java | 1 +
.../qpid/server/protocol/v1_0/constants/Bytes.java | 62 +++++
.../server/protocol/v1_0/framing/AMQFrame.java | 46 +++-
.../protocol/v1_0/framing/TransportFrame.java | 44 +++
.../v1_0/messaging/SectionDecoderImpl.java | 1 +
.../qpid/server/protocol/v1_0/type/Binary.java | 14 +
.../v1_0/type/codec/AMQPDescribedTypeRegistry.java | 3 +-
.../protocol/v1_0/type/messaging/Accepted.java | 1 +
.../protocol/v1_0/type/messaging/AmqpValue.java | 2 +-
.../protocol/v1_0/type/transport/Transfer.java | 6 +
.../protocol/v1_0/ConsumerTarget_1_0Test.java | 267 ++++++++++++++++++-
.../qpid/server/protocol/v1_0/DeliveryTest.java | 118 +++++++++
.../v1_0/ProtocolEngineCreator_1_0_0Test.java | 65 +++++
.../v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java | 63 +++++
.../protocol/v1_0/ProtocolEngine_1_0_0Test.java | 20 ++
.../qpid/server/protocol/v1_0/Session_1_0Test.java | 294 +++++++++++++++++++++
36 files changed, 1246 insertions(+), 184 deletions(-)
diff --git
a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
index 2d76f5a0e9..14b2775afa 100644
---
a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
+++
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
@@ -77,6 +77,7 @@ public class CompositeTypeConstructorGenerator extends
AbstractProcessor
"org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy",
"org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability");
+ private static final List<String> SINGLETONS = List.of("Accepted");
@Override
public SourceVersion getSupportedSourceVersion()
@@ -188,7 +189,14 @@ public class CompositeTypeConstructorGenerator extends
AbstractProcessor
pw.println(" @Override");
pw.println(" protected " + objectSimpleName + " construct(final
FieldValueReader fieldValueReader) throws AmqpErrorException");
pw.println(" {");
- pw.println(" " + objectSimpleName + " obj = new " +
objectSimpleName + "();");
+ if (SINGLETONS.contains(objectSimpleName))
+ {
+ pw.println(" " + objectSimpleName + " obj = " +
objectSimpleName + ".INSTANCE;");
+ }
+ else
+ {
+ pw.println(" " + objectSimpleName + " obj = new " +
objectSimpleName + "();");
+ }
pw.println();
generateAssigners(pw, typeElement);
pw.println(" return obj;");
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 3e2c0249fc..13736ef257 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -51,14 +51,13 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.messages.ResourceLimitMessages;
-import org.apache.qpid.server.security.limit.ConnectionLimitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.messages.ResourceLimitMessages;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Connection;
@@ -74,6 +73,7 @@ import
org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
@@ -117,6 +117,7 @@ import
org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
+import org.apache.qpid.server.security.limit.ConnectionLimitException;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.AggregateTicker;
@@ -146,31 +147,6 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
private final AtomicBoolean _stateChanged = new AtomicBoolean();
private final AtomicReference<Action<ProtocolEngine>> _workListener = new
AtomicReference<>();
-
- private static final byte[] SASL_HEADER = new byte[]
- {
- (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 3,
- (byte) 1,
- (byte) 0,
- (byte) 0
- };
-
- private static final byte[] AMQP_HEADER = new byte[]
- {
- (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- };
-
private final FrameWriter _frameWriter;
private ProtocolHandler _frameHandler;
private volatile boolean _transportBlockedForWriting;
@@ -286,7 +262,7 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
}
String mechanism = saslInit.getMechanism().toString();
final Binary initialResponse = saslInit.getInitialResponse();
- byte[] response = initialResponse == null ? new byte[0] :
initialResponse.getArray();
+ byte[] response = initialResponse == null ? Bytes.EMPTY_BYTE_ARRAY :
initialResponse.getArray();
List<String> availableMechanisms =
_subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure());
@@ -306,7 +282,7 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
{
assertState(ConnectionState.AWAIT_SASL_RESPONSE);
final Binary responseBinary = saslResponse.getResponse();
- byte[] response = responseBinary == null ? new byte[0] :
responseBinary.getArray();
+ byte[] response = responseBinary == null ? Bytes.EMPTY_BYTE_ARRAY :
responseBinary.getArray();
processSaslResponse(response);
}
@@ -338,7 +314,7 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
SubjectAuthenticationResult authenticationResult =
_successfulAuthenticationResult;
if (authenticationResult == null)
{
- authenticationResult =
_subjectCreator.authenticate(_saslNegotiator, response != null ? response : new
byte[0]);
+ authenticationResult =
_subjectCreator.authenticate(_saslNegotiator, response != null ? response :
Bytes.EMPTY_BYTE_ARRAY);
challenge = authenticationResult.getChallenge();
}
@@ -1204,21 +1180,30 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
{
if (!_closedForOutput)
{
- ValueWriter<FrameBody> writer =
_describedTypeRegistry.getValueWriter(body);
- if (payload == null)
+ final int payloadRemaining = payload == null ? 0 :
payload.remaining();
+ final boolean hasPayload = payloadRemaining > 0;
+
+ if (hasPayload && !(body instanceof Transfer))
+ {
+ throw new ConnectionScopedRuntimeException("Non-empty payload
is only supported for Transfer frames. " +
+ "body=" + (body == null ? "null" :
body.getClass().getName()) +
+ ", payloadRemaining=" + payloadRemaining);
+ }
+
+ ValueWriter<FrameBody> writer = body == null ? null :
_describedTypeRegistry.getValueWriter(body);
+ if (!hasPayload)
{
- send(new TransportFrame(channel, body));
+ send(new TransportFrame(channel, body, writer));
return 0;
}
else
{
int size = writer.getEncodedSize();
int maxPayloadSize = _maxFrameSize - (size + 9);
- long payloadLength = (long) payload.remaining();
- if (payloadLength <= maxPayloadSize)
+ if (payloadRemaining <= maxPayloadSize)
{
- send(new TransportFrame(channel, body, payload));
- return (int)payloadLength;
+ send(new TransportFrame(channel, body, payload, writer));
+ return payloadRemaining;
}
else
{
@@ -1231,7 +1216,7 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
try (QpidByteBuffer payloadDup = payload.view(0,
maxPayloadSize))
{
payload.position(payload.position() + maxPayloadSize);
- send(new TransportFrame(channel, body, payloadDup));
+ send(new TransportFrame(channel, body, payloadDup,
writer));
}
return maxPayloadSize;
@@ -1361,14 +1346,16 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
final AuthenticationProvider<?> authenticationProvider =
getPort().getAuthenticationProvider();
- if(Arrays.equals(header, SASL_HEADER))
+ final byte[] amqpHeader = Bytes.amqpHeader();
+ final byte[] saslHeader = Bytes.saslHeader();
+ if (Arrays.equals(header, saslHeader))
{
if(_saslComplete)
{
throw new ConnectionScopedRuntimeException("SASL Layer
header received after SASL already established");
}
- try (QpidByteBuffer protocolHeader =
QpidByteBuffer.wrap(SASL_HEADER))
+ try (QpidByteBuffer protocolHeader =
QpidByteBuffer.wrap(saslHeader))
{
getSender().send(protocolHeader);
}
@@ -1384,7 +1371,7 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
_connectionState = ConnectionState.AWAIT_SASL_INIT;
_frameHandler = getFrameHandler(true);
}
- else if(Arrays.equals(header, AMQP_HEADER))
+ else if(Arrays.equals(header, amqpHeader))
{
if(!_saslComplete)
{
@@ -1406,7 +1393,7 @@ public class AMQPConnection_1_0Impl extends
AbstractAMQPConnection<AMQPConnectio
}
}
- try (QpidByteBuffer protocolHeader =
QpidByteBuffer.wrap(AMQP_HEADER))
+ try (QpidByteBuffer protocolHeader =
QpidByteBuffer.wrap(amqpHeader))
{
getSender().send(protocolHeader);
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
index fb50ef0e0e..ab55ce4f61 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AnonymousRelayDestination.java
@@ -37,6 +37,8 @@ import org.apache.qpid.server.txn.ServerTransaction;
public class AnonymousRelayDestination implements ReceivingDestination
{
+ private static final Symbol[] CAPABILITIES = { DELAYED_DELIVERY };
+
private final Target _target;
private final NamedAddressSpace _addressSpace;
private final EventLogger _eventLogger;
@@ -53,10 +55,17 @@ public class AnonymousRelayDestination implements
ReceivingDestination
.contains(DISCARD_UNROUTABLE);
}
+ /**
+ * Returns the target capabilities for an anonymous relay.
+ * <br>
+ * Note: returns a shared array instance to avoid per-call allocation. The
returned array must be treated as
+ * immutable and must not be modified by callers.
+ * @return {@link Symbol} array
+ */
@Override
public Symbol[] getCapabilities()
{
- return new Symbol[]{DELAYED_DELIVERY};
+ return CAPABILITIES;
}
@Override
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 319ca7ada8..57ef39b60f 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -68,6 +67,8 @@ import org.apache.qpid.server.util.StateChangeListener;
class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerTarget_1_0.class);
+ private static final UnsettledAction DO_NOTHING_ACTION = new
DoNothingAction();
+
private final boolean _acquires;
private long _deliveryTag = 0L;
@@ -189,55 +190,82 @@ class ConsumerTarget_1_0 extends
AbstractConsumerTarget<ConsumerTarget_1_0>
headerSection = header.createEncodingRetainingSection();
}
- List<QpidByteBuffer> payload = new ArrayList<>();
- if(headerSection != null)
+
+ final EncodingRetainingSection<?> deliveryAnnotationsSection =
message.getDeliveryAnnotationsSection();
+ final EncodingRetainingSection<?> messageAnnotationsSection =
message.getMessageAnnotationsSection();
+ final EncodingRetainingSection<?> propertiesSection =
message.getPropertiesSection();
+ final EncodingRetainingSection<?> applicationPropertiesSection =
message.getApplicationPropertiesSection();
+ final EncodingRetainingSection<?> footerSection =
message.getFooterSection();
+
+ final boolean bodyOnly = headerSection == null &&
+ deliveryAnnotationsSection == null &&
+ messageAnnotationsSection == null &&
+ propertiesSection == null &&
+ applicationPropertiesSection == null &&
+ footerSection == null;
+
+ if (bodyOnly)
{
- payload.add(headerSection.getEncodedForm());
- headerSection.dispose();
+ if (bodyContent != null)
+ {
+ transfer.setPayload(bodyContent);
+ bodyContent.dispose();
+ }
}
- EncodingRetainingSection<?> section;
- if((section = message.getDeliveryAnnotationsSection()) != null)
+ else
{
- payload.add(section.getEncodedForm());
- section.dispose();
- }
+ final List<QpidByteBuffer> payload = new ArrayList<>();
- if((section = message.getMessageAnnotationsSection()) != null)
- {
- payload.add(section.getEncodedForm());
- section.dispose();
- }
+ if (headerSection != null)
+ {
+ payload.add(headerSection.getEncodedForm());
+ headerSection.dispose();
+ }
- if((section = message.getPropertiesSection()) != null)
- {
- payload.add(section.getEncodedForm());
- section.dispose();
- }
+ if (deliveryAnnotationsSection != null)
+ {
+ payload.add(deliveryAnnotationsSection.getEncodedForm());
+ deliveryAnnotationsSection.dispose();
+ }
- if((section = message.getApplicationPropertiesSection()) != null)
- {
- payload.add(section.getEncodedForm());
- section.dispose();
- }
+ if (messageAnnotationsSection != null)
+ {
+ payload.add(messageAnnotationsSection.getEncodedForm());
+ messageAnnotationsSection.dispose();
+ }
- payload.add(bodyContent);
+ if (propertiesSection != null)
+ {
+ payload.add(propertiesSection.getEncodedForm());
+ propertiesSection.dispose();
+ }
- if((section = message.getFooterSection()) != null)
- {
- payload.add(section.getEncodedForm());
- section.dispose();
- }
+ if (applicationPropertiesSection != null)
+ {
+ payload.add(applicationPropertiesSection.getEncodedForm());
+ applicationPropertiesSection.dispose();
+ }
- try (QpidByteBuffer combined = QpidByteBuffer.concatenate(payload))
- {
- transfer.setPayload(combined);
- }
+ if (bodyContent != null)
+ {
+ payload.add(bodyContent);
+ }
+
+ if (footerSection != null)
+ {
+ payload.add(footerSection.getEncodedForm());
+ footerSection.dispose();
+ }
- payload.forEach(QpidByteBuffer::dispose);
+ try (QpidByteBuffer combined =
QpidByteBuffer.concatenate(payload))
+ {
+ transfer.setPayload(combined);
+ }
+
+ payload.forEach(QpidByteBuffer::dispose);
+ }
- byte[] data = new byte[8];
- ByteBuffer.wrap(data).putLong(_deliveryTag++);
- final Binary tag = new Binary(data);
+ final Binary tag = Binary.ofDeliveryTag(_deliveryTag++);
transfer.setDeliveryTag(tag);
@@ -249,7 +277,7 @@ class ConsumerTarget_1_0 extends
AbstractConsumerTarget<ConsumerTarget_1_0>
transfer.setSettled(true);
if (_acquires && _transactionId == null)
{
- transfer.setState(new Accepted());
+ transfer.setState(Accepted.INSTANCE);
}
}
else
@@ -262,7 +290,7 @@ class ConsumerTarget_1_0 extends
AbstractConsumerTarget<ConsumerTarget_1_0>
}
else
{
- action = new DoNothingAction();
+ action = DO_NOTHING_ACTION;
}
_linkEndpoint.addUnsettled(tag, action, entry);
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index 9bd89cbb30..236ff40ebc 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -170,13 +170,8 @@ public class Delivery
}
}
- try (QpidByteBuffer payload = transfer.getPayload())
- {
- if (payload != null)
- {
- _totalPayloadSize += (long) payload.remaining();
- }
- }
+ final int remaining = transfer.getPayloadRemaining();
+ _totalPayloadSize += (long) remaining;
}
public LinkEndpoint<? extends BaseSource, ? extends BaseTarget>
getLinkEndpoint()
@@ -186,6 +181,20 @@ public class Delivery
public QpidByteBuffer getPayload()
{
+ final int size = _transfers.size();
+ if (size == 0)
+ {
+ return QpidByteBuffer.emptyQpidByteBuffer();
+ }
+ else if (size == 1)
+ {
+ final Transfer transfer = _transfers.get(0);
+ final QpidByteBuffer payload = transfer.getPayload();
+ transfer.dispose();
+ _transfers.clear();
+ return payload == null ? QpidByteBuffer.emptyQpidByteBuffer() :
payload;
+ }
+
List<QpidByteBuffer> transferBuffers = new
ArrayList<>(_transfers.size());
for (Transfer t : _transfers)
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
index 8f84530c6f..cb9eb3942c 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeSendingDestination.java
@@ -58,7 +58,7 @@ import
org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class ExchangeSendingDestination extends StandardSendingDestination
{
- private static final Accepted ACCEPTED = new Accepted();
+ private static final Accepted ACCEPTED = Accepted.INSTANCE;
private static final Rejected REJECTED = new Rejected();
private static final Outcome[] OUTCOMES = { ACCEPTED, REJECTED};
public static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
index 4fd015c2e8..f7af6edb39 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
@@ -62,7 +62,6 @@ import
org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@PluggableService
public class MessageConverter_Internal_to_v1_0 extends
MessageConverter_to_1_0<InternalMessage>
{
-
private static final Set<Class<?>> TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE =
Set.of(String.class,
Character.class,
Boolean.class,
@@ -163,8 +162,7 @@ public class MessageConverter_Internal_to_v1_0 extends
MessageConverter_to_1_0<I
{
contentTypeAnnotationValue =
isSectionValidForJmsMap(convertedMessageBody) ? MAP_MESSAGE.getType() : null;
}
- else if (originalMessageBody != null
- &&
TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz ->
clazz.isAssignableFrom(originalMessageBody.getClass())))
+ else if (originalMessageBody != null &&
expressibleAsAmqpValue(originalMessageBody))
{
contentTypeAnnotationValue = null;
}
@@ -211,8 +209,7 @@ public class MessageConverter_Internal_to_v1_0 extends
MessageConverter_to_1_0<I
{
contentTypeAsString = null;
}
- else if (messageBody != null
- &&
TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz ->
clazz.isAssignableFrom(messageBody.getClass())))
+ else if (messageBody != null && expressibleAsAmqpValue(messageBody))
{
contentTypeAsString = mimeType;
}
@@ -280,8 +277,7 @@ public class MessageConverter_Internal_to_v1_0 extends
MessageConverter_to_1_0<I
public NonEncodingRetainingSection<?> convertToBody(Object object)
{
- if (object == null
- || TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE.stream().anyMatch(clazz ->
clazz.isAssignableFrom(object.getClass())))
+ if (object == null || expressibleAsAmqpValue(object))
{
return new AmqpValue(object);
}
@@ -312,4 +308,20 @@ public class MessageConverter_Internal_to_v1_0 extends
MessageConverter_to_1_0<I
}
}
+ private static boolean expressibleAsAmqpValue(final Object object)
+ {
+ if (object == null)
+ {
+ return true;
+ }
+ final Class<?> type = object.getClass();
+ for (final Class<?> amqpType : TYPES_EXPRESSIBLE_AS_AMQP_1_0_VALUE)
+ {
+ if (amqpType.isAssignableFrom(type))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
index 93ab0c32d4..14f9b78d69 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v1_0;
import static
org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry.getBestFitObjectToMimeContentConverter;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -63,6 +62,8 @@ import
org.apache.qpid.server.util.ServerScopedRuntimeException;
public class MessageConverter_from_1_0
{
+ private static final SectionDecoderImpl SECTION_DECODER =
+ new
SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
private static final Set<Class> STANDARD_TYPES = new
HashSet<>(Arrays.<Class>asList(Boolean.class,
Byte.class,
@@ -79,15 +80,13 @@ public class MessageConverter_from_1_0
static Object convertBodyToObject(final Message_1_0 serverMessage)
{
- final SectionDecoderImpl sectionDecoder = new
SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY.getSectionDecoderRegistry());
-
Object bodyObject = null;
List<EncodingRetainingSection<?>> sections = null;
try
{
try (QpidByteBuffer allData = serverMessage.getContent())
{
- sections = sectionDecoder.parseAll(allData);
+ sections = SECTION_DECODER.parseAll(allData);
}
final int size = sections == null ? 0 : sections.size();
@@ -127,10 +126,12 @@ public class MessageConverter_from_1_0
totalSize +=
((DataSection)section).getValue().getArray().length;
}
final byte[] bodyData = new byte[totalSize];
- final ByteBuffer buf = ByteBuffer.wrap(bodyData);
+ int pos = 0;
for(EncodingRetainingSection<?> section : bodySections)
{
- buf.put(((DataSection)
section).getValue().asByteBuffer());
+ final byte[] src = ((DataSection)
section).getValue().getArray();
+ System.arraycopy(src, 0, bodyData, pos, src.length);
+ pos += src.length;
}
bodyObject = bodyData;
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index a9914c4a31..518611b0d0 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -68,12 +68,24 @@ import org.apache.qpid.server.util.GZIPUtils;
public abstract class MessageConverter_to_1_0<M extends ServerMessage>
implements MessageConverter<M, Message_1_0>
{
+ /** Must be treated as immutable and must not be modified */
private static final byte[] SERIALIZED_NULL = getObjectBytes(null);
+ /** Must be treated as immutable and must not be modified */
+ private static final NonEncodingRetainingSection<?> AMQP_VALUE_NULL = new
AmqpValue(null);
+ /** Must be treated as immutable and must not be modified */
+ private static final NonEncodingRetainingSection<?>
AMQP_VALUE_EMPTY_STRING = new AmqpValue("");
+ /** Must be treated as immutable and must not be modified */
+ private static final NonEncodingRetainingSection<?> AMQP_VALUE_EMPTY_LIST
= new AmqpSequence(Collections.emptyList());
+ /** Must be treated as immutable and must not be modified */
+ private static final NonEncodingRetainingSection<?> AMQP_VALUE_EMPTY_MAP =
new AmqpValue(Collections.emptyMap());
+ /** Must be treated as immutable and must not be modified */
+ private static final NonEncodingRetainingSection<?> DATA_SERIALIZED_NULL =
new Data(new Binary(SERIALIZED_NULL.clone()));
+
private final AMQPDescribedTypeRegistry _typeRegistry =
AMQPDescribedTypeRegistry.newInstance()
-
.registerTransportLayer()
-
.registerMessagingLayer()
-
.registerTransactionLayer()
-
.registerSecurityLayer();
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
public static Symbol getContentType(final String contentMimeType)
{
@@ -280,23 +292,23 @@ public abstract class MessageConverter_to_1_0<M extends
ServerMessage> implement
}
else if (mimeType == null)
{
- return new AmqpValue(null);
+ return AMQP_VALUE_NULL;
}
else if (OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
- return new Data(new Binary(SERIALIZED_NULL));
+ return DATA_SERIALIZED_NULL;
}
else if (TEXT_CONTENT_TYPES.matcher(mimeType).matches())
{
- return new AmqpValue("");
+ return AMQP_VALUE_EMPTY_STRING;
}
else if (MAP_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
- return new AmqpValue(Collections.emptyMap());
+ return AMQP_VALUE_EMPTY_MAP;
}
else if (LIST_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches())
{
- return new AmqpSequence(Collections.emptyList());
+ return AMQP_VALUE_EMPTY_LIST;
}
return new Data(new Binary(data));
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index d1ea39ae2f..da8f7eb40a 100755
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -404,6 +404,7 @@ public class MessageMetaData_1_0 implements
StorableMessageMetaData
private static class MetaDataFactory implements
MessageMetaDataType.Factory<MessageMetaData_1_0>
{
private final AMQPDescribedTypeRegistry _typeRegistry =
AMQPDescribedTypeRegistry.newInstance();
+ private final SectionDecoder _sectionDecoder = new
SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
private MetaDataFactory()
{
@@ -449,9 +450,7 @@ public class MessageMetaData_1_0 implements
StorableMessageMetaData
versionByte));
}
- SectionDecoder sectionDecoder = new
SectionDecoderImpl(_typeRegistry.getSectionDecoderRegistry());
-
- List<EncodingRetainingSection<?>> sections =
sectionDecoder.parseAll(buf);
+ List<EncodingRetainingSection<?>> sections =
_sectionDecoder.parseAll(buf);
if (versionByte == 0)
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index f33dfe39e4..39f0792b86 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -55,6 +55,8 @@ public class Message_1_0 extends
AbstractServerMessageImpl<Message_1_0, MessageM
.registerSecurityLayer();
private static final MessageMetaData_1_0 DELETED_MESSAGE_METADATA = new
MessageMetaData_1_0(null, null, null, null, null, null, 0L, 0L);
private static final String AMQP_1_0 = "AMQP 1.0";
+ private static final SectionDecoder SECTION_DECODER =
+ new
SectionDecoderImpl(DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry());
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
{
@@ -176,8 +178,6 @@ public class Message_1_0 extends
AbstractServerMessageImpl<Message_1_0, MessageM
{
if(getMessageMetaData().getVersion() == 0)
{
- SectionDecoder sectionDecoder = new
SectionDecoderImpl(DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry());
-
try
{
List<EncodingRetainingSection<?>> sections;
@@ -185,7 +185,7 @@ public class Message_1_0 extends
AbstractServerMessageImpl<Message_1_0, MessageM
// not just #getSize()
try (QpidByteBuffer allSectionsContent = super.getContent(0,
Integer.MAX_VALUE))
{
- sections = sectionDecoder.parseAll(allSectionsContent);
+ sections = SECTION_DECODER.parseAll(allSectionsContent);
}
List<QpidByteBuffer> bodySectionContent = new ArrayList<>();
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
index 3727094b8d..dd81cfd290 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
@@ -45,6 +45,9 @@ import org.apache.qpid.server.txn.TransactionMonitor;
public class NodeReceivingDestination implements ReceivingDestination
{
+ private static final Symbol[] CAPABILITIES_DISCARD = { DISCARD_UNROUTABLE,
DELAYED_DELIVERY };
+ private static final Symbol[] CAPABILITIES_REJECT = { REJECT_UNROUTABLE,
DELAYED_DELIVERY };
+
private final boolean _discardUnroutable;
private final EventLogger _eventLogger;
@@ -145,11 +148,13 @@ public class NodeReceivingDestination implements
ReceivingDestination
}
else
{
- result.getRoutes()
- .stream()
- .filter(q -> q instanceof TransactionMonitor)
- .map(TransactionMonitor.class::cast)
- .forEach(tm -> tm.registerTransaction(txn));
+ for (final Object route : result.getRoutes())
+ {
+ if (route instanceof TransactionMonitor transactionMonitor)
+ {
+ transactionMonitor.registerTransaction(txn);
+ }
+ }
}
}
@@ -201,12 +206,16 @@ public class NodeReceivingDestination implements
ReceivingDestination
return _destination;
}
+ /**
+ * Returns the supported outcome capabilities for this node.
+ * <br>
+ * Note: returns a shared array to avoid per-call allocations. The
returned array must be treated as
+ * immutable and must not be modified by callers.
+ * @return {@link Symbol} array
+ */
@Override
public Symbol[] getCapabilities()
{
- Symbol[] capabilities = new Symbol[2];
- capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE :
REJECT_UNROUTABLE;
- capabilities[1] = DELAYED_DELIVERY;
- return capabilities;
+ return _discardUnroutable ? CAPABILITIES_DISCARD : CAPABILITIES_REJECT;
}
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
index 618e8ff008..d9dff8cd38 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
import org.apache.qpid.server.transport.ProtocolEngine;
import
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
@@ -44,17 +45,6 @@ public class ProtocolEngineCreator_1_0_0 implements
ProtocolEngineCreator
{
private static final Logger LOGGER =
LoggerFactory.getLogger(ProtocolEngineCreator_1_0_0.class);
- private static final byte[] AMQP_1_0_0_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- };
-
public ProtocolEngineCreator_1_0_0()
{
}
@@ -65,11 +55,15 @@ public class ProtocolEngineCreator_1_0_0 implements
ProtocolEngineCreator
return Protocol.AMQP_1_0;
}
-
+ /**
+ * Returns a AMQP header shared array instance to avoid per-call
allocation.
+ * The returned array must be treated as immutable and must not be
modified by callers.
+ * @return AMQP header byte array
+ */
@Override
public byte[] getHeaderIdentifier()
{
- return AMQP_1_0_0_HEADER;
+ return Bytes.amqpHeader();
}
@Override
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
index 4def7a9a4d..4117a33479 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
@@ -20,30 +20,20 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.PluggableService;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
-import org.apache.qpid.server.transport.ServerNetworkConnection;
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
import org.apache.qpid.server.transport.AggregateTicker;
+import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
@PluggableService
public class ProtocolEngineCreator_1_0_0_SASL implements ProtocolEngineCreator
{
- private static final byte[] AMQP_SASL_1_0_0_HEADER =
- new byte[] { (byte) 'A',
- (byte) 'M',
- (byte) 'Q',
- (byte) 'P',
- (byte) 3,
- (byte) 1,
- (byte) 0,
- (byte) 0
- };
-
public ProtocolEngineCreator_1_0_0_SASL()
{
}
@@ -54,11 +44,15 @@ public class ProtocolEngineCreator_1_0_0_SASL implements
ProtocolEngineCreator
return Protocol.AMQP_1_0;
}
-
+ /**
+ * Returns a AMQP header shared array instance to avoid per-call
allocation.
+ * The returned array must be treated as immutable and must not be
modified by callers.
+ * @return AMQP header byte array
+ */
@Override
public byte[] getHeaderIdentifier()
{
- return AMQP_SASL_1_0_0_HEADER;
+ return Bytes.saslHeader();
}
@Override
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index f5080a3bc6..71183e51d4 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -533,7 +533,7 @@ public class SendingLinkEndpoint extends
AbstractLinkEndpoint<Source, Target>
while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())
{
- Accepted accepted = new Accepted();
+ Accepted accepted = Accepted.INSTANCE;
Transfer xfr = new Transfer();
Binary dt = _resumeAcceptedTransfers.remove(0);
xfr.setDeliveryTag(dt);
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 1602b6d65b..a70684ca32 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -316,9 +316,12 @@ public class Session_1_0 extends
AbstractAMQPSession<Session_1_0, ConsumerTarget
private SortedSet<UnsignedInteger> getDeliveryIds(final Set<Binary>
deliveryTags, final LinkEndpoint<?, ?> linkEndpoint)
{
final DeliveryRegistry deliveryRegistry =
getDeliveryRegistry(linkEndpoint.getRole());
- return deliveryTags.stream()
- .map(deliveryTag -> getDeliveryId(deliveryRegistry,
deliveryTag, linkEndpoint))
- .collect(Collectors.toCollection(TreeSet::new));
+ final SortedSet<UnsignedInteger> ids = new TreeSet<>();
+ for (final Binary tag : deliveryTags)
+ {
+ ids.add(getDeliveryId(deliveryRegistry, tag, linkEndpoint));
+ }
+ return ids;
}
private UnsignedInteger getDeliveryId(final Binary deliveryTag, final
LinkEndpoint<?, ?> linkEndpoint)
@@ -390,24 +393,22 @@ public class Session_1_0 extends
AbstractAMQPSession<Session_1_0, ConsumerTarget
{
long remaining = payload == null ? 0 : (long) payload.remaining();
int payloadSent = _connection.sendFrame(_sendingChannel, xfr,
payload);
- if(payload != null)
+ if (payload != null)
{
while (payloadSent < remaining && payloadSent >= 0)
{
- Transfer continuationTransfer = new Transfer();
-
+ final Transfer continuationTransfer = new Transfer();
continuationTransfer.setHandle(xfr.getHandle());
continuationTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
continuationTransfer.setState(xfr.getState());
- continuationTransfer.setPayload(payload);
+ // no need to pass payload to continuationTransfer as
AMQPConnection_1_0Impl#sendFrame()
+ // takes payload as an argument
_nextOutgoingId.incr();
_remoteIncomingWindow--;
- remaining = (long) payload.remaining();
+ remaining = payload.remaining();
payloadSent = _connection.sendFrame(_sendingChannel,
continuationTransfer, payload);
-
- continuationTransfer.dispose();
}
}
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 1ceb1dd7e1..0e8bf0a8f4 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -81,7 +81,7 @@ public class StandardReceivingLinkEndpoint extends
AbstractReceivingLinkEndpoint
{
private static final Logger LOGGER =
LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
- private static final Accepted ACCEPTED = new Accepted();
+ private static final Accepted ACCEPTED = Accepted.INSTANCE;
private static final String LINK = "link";
private final java.util.Queue<AsyncCommand> _unfinishedCommandsQueue = new
ConcurrentLinkedQueue<>();
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
index 7940c8d907..533322254a 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardSendingDestination.java
@@ -34,7 +34,7 @@ import
org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
public class StandardSendingDestination implements SendingDestination
{
- private static final Accepted ACCEPTED = new Accepted();
+ private static final Accepted ACCEPTED = Accepted.INSTANCE;
private static final Outcome[] OUTCOMES = new Outcome[] { ACCEPTED };
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index c87b00a389..c57ed309f9 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -119,7 +119,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends
AbstractReceivingLinkEn
final DeliveryState outcome;
if (error == null)
{
- outcome = new Accepted();
+ outcome = Accepted.INSTANCE;
}
else if
(CollectionUtils.nullSafeList(getSource().getOutcomes()).contains(Rejected.REJECTED_SYMBOL))
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
index b0f8ed08a2..ab0b5815e6 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/FrameWriter.java
@@ -30,7 +30,6 @@ public class FrameWriter
{
private final ByteBufferSender _sender;
private final ValueWriter.Registry _registry;
- private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
public FrameWriter(final ValueWriter.Registry registry, final
ByteBufferSender sender)
{
@@ -45,7 +44,18 @@ public class FrameWriter
final int payloadLength = payload == null ? 0 : payload.remaining();
final T frameBody = frame.getFrameBody();
- final ValueWriter<T> typeWriter = frameBody == null ? null :
_registry.getValueWriter(frameBody);
+ final ValueWriter<T> typeWriter;
+
+ if (frameBody == null)
+ {
+ typeWriter = null;
+ }
+ else
+ {
+ final ValueWriter<T> cachedWriter = frame.getFrameBodyWriter();
+ typeWriter = cachedWriter == null ?
_registry.getValueWriter(frameBody) : cachedWriter;
+ }
+
int bodySize;
if (typeWriter == null)
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
index 0aa3a177c9..f83f7dceb7 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
@@ -25,6 +25,7 @@ import
org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+/** {@link ValueHandler} is thread safe */
public class ValueHandler implements DescribedTypeConstructorRegistry.Source
{
public static final byte DESCRIBED_TYPE = (byte)0;
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/constants/Bytes.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/constants/Bytes.java
new file mode 100644
index 0000000000..53034b827f
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/constants/Bytes.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.constants;
+
+/** Utility class holding frequently used byte arrays */
+public final class Bytes
+{
+ private Bytes()
+ {
+ // constructor is private for utility class
+ }
+
+ /** AMQP header byte array representation */
+ private static final byte[] AMQP_HEADER_BYTES = { (byte) 'A', (byte) 'M',
(byte) 'Q', (byte) 'P',
+ (byte) 0, (byte) 1, (byte) 0, (byte) 0 };
+
+ /** Empty byte array */
+ public static final byte[] EMPTY_BYTE_ARRAY = { };
+
+ /** SASL header byte array representation */
+ private static final byte[] SASL_HEADER_BYTES = { (byte) 'A', (byte) 'M',
(byte) 'Q', (byte) 'P',
+ (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
+ /**
+ * Returns a AMQP header shared array instance to avoid per-call
allocation.
+ * The returned array must be treated as immutable and must not be
modified by callers.
+ * @return AMQP header byte array
+ */
+ public static byte[] amqpHeader()
+ {
+ return AMQP_HEADER_BYTES;
+ }
+
+ /**
+ * Returns a SASL header shared array instance to avoid per-call
allocation.
+ * The returned array must be treated as immutable and must not be
modified by callers.
+ * @return SASL header byte array
+ */
+ public static byte[] saslHeader()
+ {
+ return SASL_HEADER_BYTES;
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
index eeec261df4..5e60054269 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/AMQFrame.java
@@ -22,21 +22,60 @@
package org.apache.qpid.server.protocol.v1_0.framing;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
public abstract class AMQFrame<T>
{
private final T _frameBody;
private final QpidByteBuffer _payload;
+ private final ValueWriter<T> _frameBodyWriter;
+ /**
+ * Base AMQP frame.
+ * @param frameBody frame body (may be null for heartbeat-like frames)
+ */
AMQFrame(T frameBody)
{
- this(frameBody, null);
+ this(frameBody, null, null);
}
+ /**
+ * Base AMQP frame.
+ * <br>
+ * {@code payload} (if present) is sent as-is using its current {@code
position/limit}.
+ * Do not mutate the payload buffer concurrently. Ensure it remains
valid/alive until the frame
+ * has been written to the transport.
+ * <br>
+ * @param frameBody frame body (may be null for heartbeat-like frames)
+ * @param payload optional payload; remaining bytes will be written
+ */
protected AMQFrame(T frameBody, QpidByteBuffer payload)
+ {
+ this(frameBody, payload, null);
+ }
+
+ /**
+ * Base AMQP frame.
+ * <br>
+ * IMPORTANT: Frames are expected to be effectively immutable after
construction.
+ * {@link FrameWriter} may use a cached {@link ValueWriter} supplied at
construction time and will
+ * not re-resolve it from the registry. If {@code frameBody} is mutated
after the writer is cached,
+ * the encoded size / encoding may become inconsistent.
+ *<br>
+ * {@code payload} (if present) is sent as-is using its current {@code
position/limit}.
+ * Do not mutate the payload buffer concurrently. Ensure it remains
valid/alive until the frame
+ * has been written to the transport.
+ * <br>
+ * @param frameBody frame body (may be null for heartbeat-like frames)
+ * @param payload optional payload; remaining bytes will be written
+ * @param frameBodyWriter optional cache hint; must match {@code
frameBody} in its current state
+ */
+ protected AMQFrame(T frameBody, QpidByteBuffer payload, ValueWriter<T>
frameBodyWriter)
{
_frameBody = frameBody;
_payload = payload;
+ _frameBodyWriter = frameBodyWriter;
}
public QpidByteBuffer getPayload()
@@ -53,6 +92,11 @@ public abstract class AMQFrame<T>
return _frameBody;
}
+ public ValueWriter<T> getFrameBodyWriter()
+ {
+ return _frameBodyWriter;
+ }
+
@Override
public String toString()
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
index 3156cbdbc7..1bd2b86d3f 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
@@ -19,6 +19,8 @@
package org.apache.qpid.server.protocol.v1_0.framing;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
public final class TransportFrame extends AMQFrame<FrameBody>
@@ -27,18 +29,60 @@ public final class TransportFrame extends
AMQFrame<FrameBody>
private final int _channel;
+ /**
+ * Base Transport frame.
+ * @param channel channel
+ * @param frameBody frame body (may be null for heartbeat-like frames)
+ */
public TransportFrame(int channel, FrameBody frameBody)
{
super(frameBody);
_channel = channel;
}
+ /**
+ * Base Transport frame.
+ * @param channel channel
+ * @param frameBody frame body (may be null for heartbeat-like frames)
+ * @param frameBodyWriter optional cached writer for {@code frameBody}.
+ * Use only when the writer was resolved for this exact instance/state of
{@code frameBody}
+ * and the body will not be mutated afterwards; otherwise pass null and
let {@link FrameWriter}
+ * resolve the writer from the registry.
+ */
+ public TransportFrame(int channel, FrameBody frameBody,
ValueWriter<FrameBody> frameBodyWriter)
+ {
+ super(frameBody, null, frameBodyWriter);
+ _channel = channel;
+ }
+
+ /**
+ * Base Transport frame.
+ * @param channel channel
+ * @param frameBody frame body (may be null for heartbeat-like frames)
+ * @param payload optional payload; remaining bytes will be written
+ */
public TransportFrame(int channel, FrameBody frameBody, QpidByteBuffer
payload)
{
super(frameBody, payload);
_channel = channel;
}
+ /**
+ * Base Transport frame.
+ * @param channel channel
+ * @param frameBody frame body (may be null for heartbeat-like frames)
+ * @param payload optional payload; remaining bytes will be written
+ * @param frameBodyWriter optional cached writer for {@code frameBody}.
+ * Use only when the writer was resolved for this exact instance/state of
{@code frameBody}
+ * and the body will not be mutated afterwards; otherwise pass null and
let {@link FrameWriter}
+ * resolve the writer from the registry.
+ */
+ public TransportFrame(int channel, FrameBody frameBody, QpidByteBuffer
payload, ValueWriter<FrameBody> frameBodyWriter)
+ {
+ super(frameBody, payload, frameBodyWriter);
+ _channel = channel;
+ }
+
@Override public int getChannel()
{
return _channel;
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
index 00fde2ea70..9eb7964619 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/messaging/SectionDecoderImpl.java
@@ -29,6 +29,7 @@ import
org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import
org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+/** {@link SectionDecoderImpl} is thread safe */
public class SectionDecoderImpl implements SectionDecoder
{
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
index 4efa3e1ae3..fa998bc526 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Binary.java
@@ -40,6 +40,20 @@ public class Binary
_hashCode = hc;
}
+ public static Binary ofDeliveryTag(final long value)
+ {
+ final byte[] data = new byte[8];
+ data[0] = (byte) (value >>> 56);
+ data[1] = (byte) (value >>> 48);
+ data[2] = (byte) (value >>> 40);
+ data[3] = (byte) (value >>> 32);
+ data[4] = (byte) (value >>> 24);
+ data[5] = (byte) (value >>> 16);
+ data[6] = (byte) (value >>> 8);
+ data[7] = (byte) value;
+ return new Binary(data);
+ }
+
public ByteBuffer asByteBuffer()
{
return ByteBuffer.wrap(_data);
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
index 0e4d71f500..0848ff7b3e 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/codec/AMQPDescribedTypeRegistry.java
@@ -69,8 +69,9 @@ import
org.apache.qpid.server.protocol.v1_0.type.transport.codec.*;
public class AMQPDescribedTypeRegistry implements
DescribedTypeConstructorRegistry, ValueWriter.Registry
{
-
+ /** Is initialized once during startup, treated as read-only afterward */
private final Map<Object, DescribedTypeConstructor> _constructorRegistry =
new HashMap<>();
+ /** Is initialized once during startup, treated as read-only afterward */
private final Map<Object, DescribedTypeConstructor>
_sectionDecoderRegistryMap = new HashMap<>();
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
index 5f98113e79..3a02290c80 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
public class Accepted implements Outcome
{
public static final Symbol ACCEPTED_SYMBOL =
Symbol.valueOf("amqp:accepted:list");
+ public static final Accepted INSTANCE = new Accepted();
@Override
public Symbol getSymbol()
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
index 26a83e1440..b50ccc3d8a 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/AmqpValue.java
@@ -45,7 +45,7 @@ public class AmqpValue implements
NonEncodingRetainingSection<Object>
@Override
public String toString()
{
- return "AmqpValue{(" + _value.getClass().getName() + ')' + _value +
'}';
+ return "AmqpValue{(" + (_value == null ? "null" :
_value.getClass().getName()) + ')' + _value + '}';
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
index e4d3231d23..5925b44d5b 100644
---
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
+++
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Transfer.java
@@ -295,6 +295,12 @@ public class Transfer implements FrameBody
conn.receiveTransfer(channel, this);
}
+ public int getPayloadRemaining()
+ {
+ final QpidByteBuffer payload = _payload;
+ return payload == null ? 0 : payload.remaining();
+ }
+
public QpidByteBuffer getPayload()
{
if (_payload == null)
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
index 7aafaf011e..41df4e3682 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0Test.java
@@ -20,7 +20,11 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -36,7 +40,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
-import org.junit.jupiter.api.BeforeAll;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Data;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
@@ -75,7 +83,7 @@ class ConsumerTarget_1_0Test extends UnitTestBase
private ConsumerTarget_1_0 _consumerTarget;
private SendingLinkEndpoint _sendingLinkEndpoint;
- @BeforeAll
+ @BeforeEach
void setUp()
{
final AMQPConnection_1_0 connection = mock(AMQPConnection_1_0.class);
@@ -110,9 +118,7 @@ class ConsumerTarget_1_0Test extends UnitTestBase
{
final Object[] args = invocation.getArguments();
final Transfer transfer = (Transfer) args[0];
-
final QpidByteBuffer transferPayload = transfer.getPayload();
-
final QpidByteBuffer payloadCopy = transferPayload.duplicate();
payloadRef.set(payloadCopy);
return null;
@@ -137,8 +143,8 @@ class ConsumerTarget_1_0Test extends UnitTestBase
}
assertNotNull(sentHeader, "Header is not found");
- assertNotNull(sentHeader.getTtl(), "Ttl is not set");
- assertTrue(sentHeader.getTtl().longValue() <= 1000, "Unexpected ttl");
+ assertNotNull(sentHeader.getTtl(), "TTL is not set");
+ assertTrue(sentHeader.getTtl().longValue() <= 1000, "Unexpected TTL");
}
private Message_1_0 createTestMessage(final Header header, long
arrivalTime)
@@ -164,4 +170,253 @@ class ConsumerTarget_1_0Test extends UnitTestBase
when(storedMessage.getMetaData()).thenReturn(metaData);
return new Message_1_0(storedMessage);
}
+
+ @Test
+ void bodyOnlyMessageIsSentAsSingleDataSection() throws Exception
+ {
+ final MessageInstanceConsumer consumer =
mock(MessageInstanceConsumer.class);
+ final byte[] body = new byte[]{10, 11, 12};
+ final Message_1_0 message = createBodyOnlyMessage(body);
+
+ final MessageInstance messageInstance = mock(MessageInstance.class);
+ when(messageInstance.getMessage()).thenReturn(message);
+ when(messageInstance.getDeliveryCount()).thenReturn(0);
+
+ final AtomicReference<QpidByteBuffer> payloadRef = new
AtomicReference<>();
+ doAnswer(invocation ->
+ {
+ final Transfer transfer = (Transfer) invocation.getArguments()[0];
+ try (QpidByteBuffer transferPayload = transfer.getPayload())
+ {
+ payloadRef.set(transferPayload == null ? null :
transferPayload.duplicate());
+ }
+ return null;
+ }).when(_sendingLinkEndpoint).transfer(any(Transfer.class),
anyBoolean());
+
+ _consumerTarget.doSend(consumer, messageInstance, false);
+
+ try (final QpidByteBuffer payload = payloadRef.get())
+ {
+ final List<EncodingRetainingSection<?>> sections =
+ new
SectionDecoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry()).parseAll(payload);
+
+ assertEquals(1, sections.size());
+ assertInstanceOf(DataSection.class, sections.get(0));
+ final DataSection dataSection = (DataSection) sections.get(0);
+ assertArrayEquals(body, dataSection.getValue().getArray());
+ sections.forEach(EncodingRetainingSection::dispose);
+ }
+ }
+
+ @Test
+ void headerAndBodyOnlyMessageIncludesHeaderSection() throws Exception
+ {
+ final MessageInstanceConsumer consumer =
mock(MessageInstanceConsumer.class);
+ final byte[] body = new byte[]{1, 2, 3, 4};
+
+ final Header header = new Header();
+ header.setDurable(true);
+ header.setPriority(UnsignedByte.valueOf((byte) 7));
+
+ final Message_1_0 message = createHeaderAndBodyOnlyMessage(header,
body, System.currentTimeMillis());
+
+ final MessageInstance messageInstance = mock(MessageInstance.class);
+ when(messageInstance.getMessage()).thenReturn(message);
+ when(messageInstance.getDeliveryCount()).thenReturn(0);
+
+ final AtomicReference<QpidByteBuffer> payloadRef = new
AtomicReference<>();
+ doAnswer(invocation ->
+ {
+ final Transfer transfer = (Transfer) invocation.getArguments()[0];
+ try (QpidByteBuffer transferPayload = transfer.getPayload())
+ {
+ payloadRef.set(transferPayload == null ? null :
transferPayload.duplicate());
+ }
+ return null;
+ }).when(_sendingLinkEndpoint).transfer(any(Transfer.class),
anyBoolean());
+
+ _consumerTarget.doSend(consumer, messageInstance, false);
+
+ try (final QpidByteBuffer payload = payloadRef.get())
+ {
+ final List<EncodingRetainingSection<?>> sections =
+ new
SectionDecoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry()).parseAll(payload);
+
+ assertEquals(2, sections.size(), "Unexpected number of sections");
+
+ Header sentHeader = null;
+ Binary sentBody = null;
+ for (final EncodingRetainingSection<?> section : sections)
+ {
+ if (section instanceof HeaderSection)
+ {
+ sentHeader = ((HeaderSection) section).getValue();
+ }
+ else if (section instanceof DataSection)
+ {
+ sentBody = ((DataSection) section).getValue();
+ }
+ }
+
+ assertNotNull(sentHeader, "Header is not found");
+ assertEquals(Boolean.TRUE, sentHeader.getDurable());
+ assertEquals(UnsignedInteger.valueOf(7).intValue(),
sentHeader.getPriority().intValue());
+ assertNotNull(sentBody, "Body is not found");
+ assertArrayEquals(body, sentBody.getArray());
+
+ sections.forEach(EncodingRetainingSection::dispose);
+ }
+ }
+
+ @Test
+ void deliveryCountAndTtlAreAppliedForHeaderAndBodyOnlyMessage() throws
Exception
+ {
+ final MessageInstanceConsumer consumer =
mock(MessageInstanceConsumer.class);
+ final byte[] body = new byte[]{10, 20, 30};
+
+ final long ttl = 2000L;
+ final long arrivalTime = System.currentTimeMillis() - 1000L;
+
+ final Header header = new Header();
+ header.setTtl(UnsignedInteger.valueOf(ttl));
+
+ final Message_1_0 message = createHeaderAndBodyOnlyMessage(header,
body, arrivalTime);
+
+ final int deliveryCount = 5;
+ final MessageInstance messageInstance = mock(MessageInstance.class);
+ when(messageInstance.getMessage()).thenReturn(message);
+ when(messageInstance.getDeliveryCount()).thenReturn(deliveryCount);
+
+ final AtomicReference<QpidByteBuffer> payloadRef = new
AtomicReference<>();
+ doAnswer(invocation ->
+ {
+ final Transfer transfer = (Transfer) invocation.getArguments()[0];
+ try (QpidByteBuffer transferPayload = transfer.getPayload())
+ {
+ payloadRef.set(transferPayload == null ? null :
transferPayload.duplicate());
+ }
+ return null;
+ }).when(_sendingLinkEndpoint).transfer(any(Transfer.class),
anyBoolean());
+
+ _consumerTarget.doSend(consumer, messageInstance, false);
+
+ final List<EncodingRetainingSection<?>> sections;
+ try (final QpidByteBuffer payload = payloadRef.get())
+ {
+ sections = new
SectionDecoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY.getSectionDecoderRegistry()).parseAll(payload);
+ }
+
+ Header sentHeader = null;
+ Binary sentBody = null;
+ for (final EncodingRetainingSection<?> section : sections)
+ {
+ if (section instanceof HeaderSection)
+ {
+ sentHeader = ((HeaderSection) section).getValue();
+ }
+ else if (section instanceof DataSection)
+ {
+ sentBody = ((DataSection) section).getValue();
+ }
+ }
+
+ assertNotNull(sentHeader, "Header is not found");
+ assertEquals(UnsignedInteger.valueOf(deliveryCount),
sentHeader.getDeliveryCount(), "Delivery count not set");
+ assertNotNull(sentHeader.getTtl(), "TTL is not set");
+ assertTrue(sentHeader.getTtl().longValue() <= 1000, "Unexpected TTL");
+
+ assertNotNull(sentBody, "Body is not found");
+ assertArrayEquals(body, sentBody.getArray());
+
+ sections.forEach(EncodingRetainingSection::dispose);
+ }
+
+ @Test
+ void nullBodyContentDoesNotSetPayloadAndDoesNotThrow()
+ {
+ final MessageInstanceConsumer consumer =
mock(MessageInstanceConsumer.class);
+ final Message_1_0 message = mock(Message_1_0.class);
+ when(message.getContent()).thenReturn(null);
+ when(message.getHeaderSection()).thenReturn(null);
+ when(message.getDeliveryAnnotationsSection()).thenReturn(null);
+ when(message.getMessageAnnotationsSection()).thenReturn(null);
+ when(message.getPropertiesSection()).thenReturn(null);
+ when(message.getApplicationPropertiesSection()).thenReturn(null);
+ when(message.getFooterSection()).thenReturn(null);
+ when(message.getSize()).thenReturn(0L);
+
+ final MessageInstance messageInstance = mock(MessageInstance.class);
+ when(messageInstance.getMessage()).thenReturn(message);
+ when(messageInstance.getDeliveryCount()).thenReturn(0);
+
+ final AtomicReference<QpidByteBuffer> payloadRef = new
AtomicReference<>();
+ doAnswer(invocation ->
+ {
+ final Transfer transfer = (Transfer) invocation.getArguments()[0];
+ payloadRef.set(transfer.getPayload());
+ return null;
+ }).when(_sendingLinkEndpoint).transfer(any(Transfer.class),
anyBoolean());
+
+ _consumerTarget.doSend(consumer, messageInstance, false);
+
+ assertNull(payloadRef.get(), "Payload must remain null when body
content is null");
+ }
+
+ private Message_1_0 createHeaderAndBodyOnlyMessage(final Header header,
final byte[] body, final long arrivalTime)
+ {
+ final DataSection dataSection = new Data(new
Binary(body)).createEncodingRetainingSection();
+ final byte[] encodedBody;
+ try (final QpidByteBuffer encoded = dataSection.getEncodedForm())
+ {
+ encodedBody = new byte[encoded.remaining()];
+ encoded.copyTo(encodedBody);
+ }
+ finally
+ {
+ dataSection.dispose();
+ }
+
+ final StoredMessage<MessageMetaData_1_0> storedMessage =
mock(StoredMessage.class);
+ when(storedMessage.getMetaData()).thenReturn(new MessageMetaData_1_0(
+ header.createEncodingRetainingSection(), null, null, null,
null, null, arrivalTime, encodedBody.length));
+ when(storedMessage.getContentSize()).thenReturn(encodedBody.length);
+ when(storedMessage.isInContentInMemory()).thenReturn(true);
+ when(storedMessage.getContent(anyInt(), anyInt())).thenAnswer(inv ->
+ {
+ final int offset = inv.getArgument(0);
+ final int length = inv.getArgument(1);
+ return QpidByteBuffer.wrap(encodedBody, offset, length);
+ });
+
+ return new Message_1_0(storedMessage);
+ }
+
+ private Message_1_0 createBodyOnlyMessage(final byte[] body)
+ {
+ final DataSection dataSection = new Data(new
Binary(body)).createEncodingRetainingSection();
+ final byte[] encodedBody;
+ try (final QpidByteBuffer encoded = dataSection.getEncodedForm())
+ {
+ encodedBody = new byte[encoded.remaining()];
+ encoded.copyTo(encodedBody);
+ }
+ finally
+ {
+ dataSection.dispose();
+ }
+
+ final StoredMessage<MessageMetaData_1_0> storedMessage =
mock(StoredMessage.class);
+ when(storedMessage.getMetaData()).thenReturn(new MessageMetaData_1_0(
+ null, null, null, null, null, null,
System.currentTimeMillis(), encodedBody.length));
+ when(storedMessage.getContentSize()).thenReturn(encodedBody.length);
+ when(storedMessage.isInContentInMemory()).thenReturn(true);
+ when(storedMessage.getContent(anyInt(), anyInt())).thenAnswer(inv ->
+ {
+ final int offset = inv.getArgument(0);
+ final int length = inv.getArgument(1);
+ return QpidByteBuffer.wrap(encodedBody, offset, length);
+ });
+
+ return new Message_1_0(storedMessage);
+ }
}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/DeliveryTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/DeliveryTest.java
new file mode 100644
index 0000000000..ec77ddc039
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/DeliveryTest.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+class DeliveryTest extends UnitTestBase
+{
+ @Test
+ void singleTransferPayloadIsReturnedAndSizeIsCounted()
+ {
+ final byte[] bytes = new byte[]{1, 2, 3, 4};
+ final Transfer t = new Transfer();
+ t.setDeliveryId(UnsignedInteger.valueOf(1));
+ t.setDeliveryTag(new Binary(new byte[]{0x01}));
+
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+ {
+ t.setPayload(buf);
+ }
+
+ final LinkEndpoint<? extends BaseSource, ? extends BaseTarget>
endpoint = mock(LinkEndpoint.class);
+ final Delivery delivery = new Delivery(t, endpoint);
+
+ assertEquals(4L, delivery.getTotalPayloadSize());
+
+ try (final QpidByteBuffer payload = delivery.getPayload())
+ {
+ final byte[] actual = new byte[payload.remaining()];
+ payload.copyTo(actual);
+ assertArrayEquals(bytes, actual);
+ }
+ }
+
+ @Test
+ void singleTransferWithNullPayloadReturnsEmptyBuffer()
+ {
+ final Transfer t = new Transfer();
+ t.setDeliveryId(UnsignedInteger.valueOf(1));
+ t.setDeliveryTag(new Binary(new byte[]{0x01}));
+
+ final LinkEndpoint<? extends BaseSource, ? extends BaseTarget>
endpoint = mock(LinkEndpoint.class);
+ final Delivery delivery = new Delivery(t, endpoint);
+
+ assertEquals(0L, delivery.getTotalPayloadSize());
+
+ try (final QpidByteBuffer payload = delivery.getPayload())
+ {
+ assertEquals(0, payload.remaining());
+ }
+ }
+
+ @Test
+ void multipleTransfersAreConcatenatedAndSizeIsSum()
+ {
+ final LinkEndpoint<? extends BaseSource, ? extends BaseTarget>
endpoint = mock(LinkEndpoint.class);
+
+ final Transfer t1 = new Transfer();
+ t1.setDeliveryId(UnsignedInteger.valueOf(1));
+ t1.setDeliveryTag(new Binary(new byte[]{0x01}));
+ t1.setMore(true);
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[]{1, 2}))
+ {
+ t1.setPayload(buf);
+ }
+
+ final Delivery delivery = new Delivery(t1, endpoint);
+
+ final Transfer t2 = new Transfer();
+ t2.setMore(false);
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[]{3, 4,
5}))
+ {
+ t2.setPayload(buf);
+ }
+
+ delivery.addTransfer(t2);
+
+ assertEquals(5L, delivery.getTotalPayloadSize());
+
+ try (final QpidByteBuffer payload = delivery.getPayload())
+ {
+ final byte[] actual = new byte[payload.remaining()];
+ payload.copyTo(actual);
+ assertArrayEquals(new byte[]{1, 2, 3, 4, 5}, actual);
+ }
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0Test.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0Test.java
new file mode 100644
index 0000000000..8f08b8cc9d
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0Test.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.model.Protocol;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class ProtocolEngineCreator_1_0_0Test
+{
+ private static final byte[] ORIGINAL_AMQP_HEADER = { (byte) 'A', (byte)
'M', (byte) 'Q', (byte) 'P',
+ (byte) 0, (byte) 1, (byte) 0, (byte) 0 };
+
+ private static final byte[] ORIGINAL_SASL_HEADER = { (byte) 'A', (byte)
'M', (byte) 'Q', (byte) 'P',
+ (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
+ @Test
+ void getHeaderIdentifier()
+ {
+ assertArrayEquals(ORIGINAL_AMQP_HEADER,
ProtocolEngineCreator_1_0_0.getInstance().getHeaderIdentifier(),
+ "AMQP header should not be changed");
+ }
+
+ @Test
+ void getSuggestedAlternativeHeader()
+ {
+ assertArrayEquals(ORIGINAL_SASL_HEADER,
ProtocolEngineCreator_1_0_0.getInstance().getSuggestedAlternativeHeader(),
+ "SASL header should not be changed");
+ }
+
+ @Test
+ void getVersion()
+ {
+ assertEquals(Protocol.AMQP_1_0,
ProtocolEngineCreator_1_0_0.getInstance().getVersion(),
+ "Protocol version should be AMQP_1_0");
+ }
+
+ @Test
+ void getType()
+ {
+ assertEquals("AMQP_1_0",
ProtocolEngineCreator_1_0_0.getInstance().getType(),
+ "Protocol version should be AMQP_1_0");
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java
new file mode 100644
index 0000000000..d6174b5e04
--- /dev/null
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASLTest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.model.Protocol;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class ProtocolEngineCreator_1_0_0_SASLTest
+{
+ private static final byte[] ORIGINAL_SASL_HEADER = { (byte) 'A', (byte)
'M', (byte) 'Q', (byte) 'P',
+ (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
+ @Test
+ void getHeaderIdentifier()
+ {
+ assertArrayEquals(ORIGINAL_SASL_HEADER,
ProtocolEngineCreator_1_0_0_SASL.getInstance().getHeaderIdentifier(),
+ "AMQP header should not be changed");
+ }
+
+ @Test
+ void getSuggestedAlternativeHeader()
+ {
+
assertNull(ProtocolEngineCreator_1_0_0_SASL.getInstance().getSuggestedAlternativeHeader(),
+ "No suggested alternative header expected");
+ }
+
+ @Test
+ void getVersion()
+ {
+ assertEquals(Protocol.AMQP_1_0,
ProtocolEngineCreator_1_0_0_SASL.getInstance().getVersion(),
+ "Protocol version should be AMQP_1_0");
+ }
+
+ @Test
+ void getType()
+ {
+ assertEquals("AMQP_1_0",
ProtocolEngineCreator_1_0_0_SASL.getInstance().getType(),
+ "Protocol version should be AMQP_1_0");
+ }
+}
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index f51f4b82b0..1301e812f4 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
@@ -40,6 +41,7 @@ import java.util.UUID;
import javax.security.auth.Subject;
+import org.apache.qpid.server.protocol.v1_0.constants.Bytes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -82,6 +84,12 @@ import org.apache.qpid.test.utils.UnitTestBase;
@SuppressWarnings({"rawtypes"})
class ProtocolEngine_1_0_0Test extends UnitTestBase
{
+ private static final byte[] ORIGINAL_AMQP_HEADER = { (byte) 'A', (byte)
'M', (byte) 'Q', (byte) 'P',
+ (byte) 0, (byte) 1, (byte) 0, (byte) 0 };
+
+ private static final byte[] ORIGINAL_SASL_HEADER = { (byte) 'A', (byte)
'M', (byte) 'Q', (byte) 'P',
+ (byte) 3, (byte) 1, (byte) 0, (byte) 0 };
+
private AMQPConnection_1_0Impl _protocolEngine_1_0_0;
private ServerNetworkConnection _networkConnection;
private Broker<?> _broker;
@@ -197,6 +205,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
final AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal, new
AuthenticatedPrincipal(anonymousAuthenticationManager.getAnonymousPrincipal()));
+
+ assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP
header should not be changed");
+ assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL
header should not be changed");
}
@Test
@@ -214,6 +225,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
verify(_virtualHost,
never()).registerConnection(any(AMQPConnection.class));
verify(_networkConnection).close();
+
+ assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP
header should not be changed");
+ assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL
header should not be changed");
}
@Test
@@ -235,6 +249,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
final AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
assertNotNull(authPrincipal);
assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
+
+ assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP
header should not be changed");
+ assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL
header should not be changed");
}
@Test
@@ -269,6 +286,9 @@ class ProtocolEngine_1_0_0Test extends UnitTestBase
final AuthenticatedPrincipal principal = (AuthenticatedPrincipal)
_connection.getAuthorizedPrincipal();
assertNotNull(principal);
assertEquals(principal, new
AuthenticatedPrincipal(anonymousAuthenticationManager.getAnonymousPrincipal()));
+
+ assertArrayEquals(ORIGINAL_AMQP_HEADER, Bytes.amqpHeader(), "AMQP
header should not be changed");
+ assertArrayEquals(ORIGINAL_SASL_HEADER, Bytes.saslHeader(), "SASL
header should not be changed");
}
private void createEngine(final Transport transport)
diff --git
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index de7fa0e8c2..5e9f23048c 100644
---
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -22,15 +22,22 @@ package org.apache.qpid.server.protocol.v1_0;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -49,6 +56,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.filter.AMQPFilterTypes;
@@ -64,12 +72,14 @@ import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -80,7 +90,9 @@ import
org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.queue.QueueConsumer;
import org.apache.qpid.server.transport.AggregateTicker;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@@ -557,6 +569,288 @@ class Session_1_0Test extends UnitTestBase
assertQueueDurability(getDynamicNodeAddressFromAttachResponse(), true);
}
+ @Test
+ void sendTransferDoesNotSetPayloadOnContinuationTransfers()
+ {
+ // Arrange: payload 25 bytes -> 3 sendFrame calls if "wire" writes 10
bytes per call
+ final byte[] bytes = createSequentialBytes(25);
+
+ final Transfer xfr = new Transfer();
+ xfr.setSettled(true);
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+ {
+ xfr.setPayload(buf);
+ }
+
+ when(_connection.sendFrame(eq(0), any(),
any(QpidByteBuffer.class))).thenAnswer(inv ->
+ {
+ final QpidByteBuffer payload = inv.getArgument(2);
+ if (payload == null)
+ {
+ return 0;
+ }
+ final int chunk = Math.min(10, payload.remaining());
+ payload.position(payload.position() + chunk);
+ return chunk;
+ });
+
+ final ArgumentCaptor<FrameBody> bodyCaptor =
ArgumentCaptor.forClass(FrameBody.class);
+ final SendingLinkEndpoint endpoint = mock(SendingLinkEndpoint.class);
+
+ _session.sendTransfer(xfr, endpoint);
+
+ verify(_connection, times(3)).sendFrame(eq(0), bodyCaptor.capture(),
any(QpidByteBuffer.class));
+ final List<FrameBody> bodies = bodyCaptor.getAllValues();
+ assertInstanceOf(Transfer.class, bodies.get(0));
+ assertNull(((Transfer) bodies.get(1)).getPayload(), "Continuation
transfer must not carry payload");
+ assertNull(((Transfer) bodies.get(2)).getPayload(), "Continuation
transfer must not carry payload");
+
+ xfr.dispose();
+ }
+
+ @Test
+ void sendTransferChunksPayloadPreservesByteOrderAndSetsMoreFlag()
+ {
+ // Arrange: payload 25 bytes -> 3 sendFrame calls if the transport
writes 10 bytes per call
+ final byte[] bytes = createSequentialBytes(25);
+
+ final Transfer xfr = new Transfer();
+ xfr.setSettled(true);
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setRcvSettleMode(ReceiverSettleMode.FIRST);
+ xfr.setState(Accepted.INSTANCE);
+ xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+ {
+ xfr.setPayload(buf);
+ }
+
+ final ByteArrayOutputStream capturedPayload =
stubSendFrameToWriteAtMostAndCapturePayload(10);
+ final ArgumentCaptor<FrameBody> bodyCaptor =
ArgumentCaptor.forClass(FrameBody.class);
+ final SendingLinkEndpoint endpoint = mock(SendingLinkEndpoint.class);
+
+ // Act
+ _session.sendTransfer(xfr, endpoint);
+
+ // Assert
+ verify(_connection, times(3)).sendFrame(eq(0), bodyCaptor.capture(),
nullable(QpidByteBuffer.class));
+ assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes
were not preserved across chunks");
+
+ final List<FrameBody> bodies = bodyCaptor.getAllValues();
+ assertEquals(3, bodies.size(), "Unexpected number of frames");
+
+ assertSame(xfr, bodies.get(0), "First frame body should be the
original Transfer");
+ assertNotSame(xfr, bodies.get(1), "Continuation frame should use a new
Transfer instance");
+ assertNotSame(xfr, bodies.get(2), "Continuation frame should use a new
Transfer instance");
+
+ final Transfer first = (Transfer) bodies.get(0);
+ final Transfer second = (Transfer) bodies.get(1);
+ final Transfer third = (Transfer) bodies.get(2);
+
+ assertEquals(Boolean.TRUE, first.getMore(), "First transfer should
have more=true when payload is chunked");
+ assertEquals(Boolean.TRUE, second.getMore(), "Intermediate
continuation transfer should have more=true");
+ assertNotEquals(Boolean.TRUE, third.getMore(), "Last transfer must not
have more=true");
+
+ assertNull(second.getPayload(), "Continuation transfer must not carry
payload");
+ assertNull(third.getPayload(), "Continuation transfer must not carry
payload");
+
+ // Continuations should preserve key delivery settings
+ assertEquals(first.getHandle(), second.getHandle(), "Handle must be
preserved on continuation transfer");
+ assertEquals(first.getHandle(), third.getHandle(), "Handle must be
preserved on continuation transfer");
+ assertEquals(first.getRcvSettleMode(), second.getRcvSettleMode(),
"RcvSettleMode must be preserved");
+ assertEquals(first.getRcvSettleMode(), third.getRcvSettleMode(),
"RcvSettleMode must be preserved");
+ assertEquals(first.getState(), second.getState(), "State must be
preserved");
+ assertEquals(first.getState(), third.getState(), "State must be
preserved");
+
+ // Delivery-id is assigned only to the first transfer
+ assertNotNull(first.getDeliveryId(), "DeliveryId should be assigned to
the first transfer");
+ assertNull(second.getDeliveryId(), "Continuation transfer must not set
delivery-id");
+ assertNull(third.getDeliveryId(), "Continuation transfer must not set
delivery-id");
+
+ xfr.dispose();
+ }
+
+ @Test
+ void sendTransferSendsSingleFrameWhenPayloadFitsWithinChunk()
+ {
+ final byte[] bytes = createSequentialBytes(9);
+
+ final Transfer xfr = new Transfer();
+ xfr.setSettled(true);
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+ {
+ xfr.setPayload(buf);
+ }
+
+ final ByteArrayOutputStream capturedPayload =
stubSendFrameToWriteAtMostAndCapturePayload(10);
+ final ArgumentCaptor<FrameBody> bodyCaptor =
ArgumentCaptor.forClass(FrameBody.class);
+
+ _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+ verify(_connection, times(1)).sendFrame(eq(0), bodyCaptor.capture(),
nullable(QpidByteBuffer.class));
+ assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes
were not written as a single chunk");
+
+ final Transfer body = (Transfer) bodyCaptor.getValue();
+ assertNotEquals(Boolean.TRUE, body.getMore(), "Single-frame transfer
must not have more=true");
+
+ xfr.dispose();
+ }
+
+ @Test
+ void sendTransferSendsSingleFrameWhenPayloadExactlyEqualsChunk()
+ {
+ final byte[] bytes = createSequentialBytes(10);
+
+ final Transfer xfr = new Transfer();
+ xfr.setSettled(true);
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+ {
+ xfr.setPayload(buf);
+ }
+
+ final ByteArrayOutputStream capturedPayload =
stubSendFrameToWriteAtMostAndCapturePayload(10);
+ final ArgumentCaptor<FrameBody> bodyCaptor =
ArgumentCaptor.forClass(FrameBody.class);
+
+ _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+ verify(_connection, times(1)).sendFrame(eq(0), bodyCaptor.capture(),
nullable(QpidByteBuffer.class));
+ assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes
were not written as a single chunk");
+
+ final Transfer body = (Transfer) bodyCaptor.getValue();
+ assertNotEquals(Boolean.TRUE, body.getMore(), "Single-frame transfer
must not have more=true");
+
+ xfr.dispose();
+ }
+
+ @Test
+ void sendTransferSendsTwoFramesWhenPayloadIsOneByteOverChunk()
+ {
+ final byte[] bytes = createSequentialBytes(11);
+
+ final Transfer xfr = new Transfer();
+ xfr.setSettled(true);
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(bytes))
+ {
+ xfr.setPayload(buf);
+ }
+
+ final ByteArrayOutputStream capturedPayload =
stubSendFrameToWriteAtMostAndCapturePayload(10);
+ final ArgumentCaptor<FrameBody> bodyCaptor =
ArgumentCaptor.forClass(FrameBody.class);
+
+ _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+ verify(_connection, times(2)).sendFrame(eq(0), bodyCaptor.capture(),
nullable(QpidByteBuffer.class));
+ assertArrayEquals(bytes, capturedPayload.toByteArray(), "Payload bytes
were not preserved across two chunks");
+
+ final List<FrameBody> bodies = bodyCaptor.getAllValues();
+ final Transfer first = (Transfer) bodies.get(0);
+ final Transfer second = (Transfer) bodies.get(1);
+
+ assertEquals(Boolean.TRUE, first.getMore(), "First transfer should
have more=true when payload is chunked");
+ assertNotEquals(Boolean.TRUE, second.getMore(), "Last transfer must
not have more=true");
+ assertNull(second.getPayload(), "Continuation transfer must not carry
payload");
+
+ xfr.dispose();
+ }
+
+ @Test
+ void sendTransferWithEmptyPayloadDoesNotSendContinuation()
+ {
+ final Transfer xfr = new Transfer();
+ xfr.setSettled(true);
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+ try (final QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[0]))
+ {
+ xfr.setPayload(buf);
+ }
+
+ final ByteArrayOutputStream capturedPayload =
stubSendFrameToWriteAtMostAndCapturePayload(10);
+ final ArgumentCaptor<FrameBody> bodyCaptor =
ArgumentCaptor.forClass(FrameBody.class);
+
+ _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+ verify(_connection, times(1)).sendFrame(eq(0), bodyCaptor.capture(),
nullable(QpidByteBuffer.class));
+ assertEquals(0, capturedPayload.size(), "Empty payload should not
produce any bytes");
+
+ xfr.dispose();
+ }
+
+ @Test
+ void sendTransferWithNullPayloadDoesNotThrowAndDoesNotSendContinuation()
+ {
+ final Transfer xfr = new Transfer();
+ xfr.setSettled(true);
+ xfr.setHandle(UnsignedInteger.valueOf(0));
+ xfr.setDeliveryTag(new Binary(new byte[]{0x01}));
+ // No payload set
+
+ when(_connection.sendFrame(eq(0), any(),
nullable(QpidByteBuffer.class))).thenReturn(0);
+ final ArgumentCaptor<QpidByteBuffer> payloadCaptor =
ArgumentCaptor.forClass(QpidByteBuffer.class);
+
+ _session.sendTransfer(xfr, mock(SendingLinkEndpoint.class));
+
+ verify(_connection, times(1)).sendFrame(eq(0), any(),
payloadCaptor.capture());
+ assertNull(payloadCaptor.getValue(), "Expected payload argument to be
null");
+
+ xfr.dispose();
+ }
+
+ private static byte[] createSequentialBytes(final int length)
+ {
+ final byte[] bytes = new byte[length];
+ for (int i = 0; i < bytes.length; i++)
+ {
+ bytes[i] = (byte) i;
+ }
+ return bytes;
+ }
+
+ private ByteArrayOutputStream
stubSendFrameToWriteAtMostAndCapturePayload(final int maxBytesPerCall)
+ {
+ final ByteArrayOutputStream captured = new ByteArrayOutputStream();
+
+ when(_connection.sendFrame(eq(0), any(),
nullable(QpidByteBuffer.class))).thenAnswer(inv ->
+ {
+ final FrameBody body = inv.getArgument(1);
+ final QpidByteBuffer payload = inv.getArgument(2);
+ if (payload == null)
+ {
+ return 0;
+ }
+
+ final int remaining = payload.remaining();
+ final int chunk = Math.min(maxBytesPerCall, remaining);
+
+ if (body instanceof Transfer)
+ {
+ // Simulate the AMQP transport behaviour: set 'more' when not
all payload is written.
+ ((Transfer) body).setMore(remaining > chunk);
+ }
+
+ // Copy the bytes without affecting the original payload buffer
position.
+ try (final QpidByteBuffer dup = payload.duplicate())
+ {
+ final byte[] bytes = new byte[chunk];
+ dup.get(bytes);
+ captured.write(bytes);
+ }
+
+ payload.position(payload.position() + chunk);
+ return chunk;
+ });
+
+ return captured;
+ }
+
private Source createDynamicSource(final DeleteOnClose lifetimePolicy)
{
final Source source = new Source();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]