This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8fb5b89 ARTEMIS-3449: fix some issues from using position to track an
unrelated state, plus simplify some previous changes and more
8fb5b89 is described below
commit 8fb5b8969f98b0994b9563cb7f5441e58253d89f
Author: Robbie Gemmell <[email protected]>
AuthorDate: Thu Sep 9 10:37:36 2021 +0100
ARTEMIS-3449: fix some issues from using position to track an unrelated
state, plus simplify some previous changes and more
---
.../amqp/proton/ProtonServerSenderContext.java | 48 +++++++++++-----------
1 file changed, 25 insertions(+), 23 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 32585d7..da09c6c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -563,26 +563,16 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
final MessageReference reference;
final AMQPLargeMessage message;
final Delivery delivery;
+ boolean initialPacketHandled;
void resume() {
connection.runNow(this::deliver);
}
void deliver() {
-
// This is discounting some bytes due to Transfer payload
final int frameSize =
protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit()
- 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
- DeliveryAnnotations deliveryAnnotationsToEncode;
-
- message.checkReference(reference);
-
- if (reference.getProtocolData() != null &&
reference.getProtocolData() instanceof DeliveryAnnotations) {
- deliveryAnnotationsToEncode =
(DeliveryAnnotations)reference.getProtocolData();
- } else {
- deliveryAnnotationsToEncode = null;
- }
-
try {
final ByteBuf frameBuffer =
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
final NettyReadable frameView = new NettyReadable(frameBuffer);
@@ -593,10 +583,12 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
// materialize it so we can use its internal NIO buffer
frameBuffer.ensureWritable(frameSize);
- if (position == 0 && sender.getLocalState() !=
EndpointState.CLOSED && position < bodySize) {
- if (!deliverInitialPacket(context,
deliveryAnnotationsToEncode, frameBuffer)) {
+ if (!initialPacketHandled && sender.getLocalState() !=
EndpointState.CLOSED) {
+ if (!deliverInitialPacket(context, frameBuffer)) {
return;
}
+
+ initialPacketHandled = true;
}
for (; sender.getLocalState() != EndpointState.CLOSED &&
position < bodySize; ) {
@@ -614,7 +606,6 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
position += readSize;
if (readSize > 0) {
-
if (position < bodySize) {
connection.instantFlush();
}
@@ -650,13 +641,24 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
}
private boolean deliverInitialPacket(final LargeBodyReader context,
- final DeliveryAnnotations
deliveryAnnotationsToEncode,
final ByteBuf frameBuffer) throws
Exception {
- assert position == 0 && context.position() == 0;
+ assert position == 0 && context.position() == 0 &&
!initialPacketHandled;
+
if (!connection.flowControl(this::resume)) {
return false;
}
+
frameBuffer.clear();
+
+ DeliveryAnnotations deliveryAnnotationsToEncode;
+ message.checkReference(reference);
+
+ if (reference.getProtocolData() != null &&
reference.getProtocolData() instanceof DeliveryAnnotations) {
+ deliveryAnnotationsToEncode =
(DeliveryAnnotations)reference.getProtocolData();
+ } else {
+ deliveryAnnotationsToEncode = null;
+ }
+
try {
replaceInitialHeader(deliveryAnnotationsToEncode, context, new
NettyWritable(frameBuffer));
} catch (IndexOutOfBoundsException indexOutOfBoundsException) {
@@ -670,15 +672,15 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
sendAndFlushInitialPacket(deliveryAnnotationsToEncode, context);
return true;
}
+
+ int readSize = 0;
final int writableBytes = frameBuffer.writableBytes();
- if (writableBytes == 0) {
- sender.send(new NettyReadable(frameBuffer));
- connection.instantFlush();
- return true;
+ if (writableBytes != 0) {
+ final int writtenBytes = frameBuffer.writerIndex();
+ readSize =
context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
+ frameBuffer.writerIndex(writtenBytes + readSize);
}
- final int writtenBytes = frameBuffer.writerIndex();
- final int readSize =
context.readInto(frameBuffer.internalNioBuffer(writtenBytes, writableBytes));
- frameBuffer.writerIndex(writtenBytes + readSize);
+
sender.send(new NettyReadable(frameBuffer));
position += readSize;
connection.instantFlush();