This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 5a18eb46 PROTON-2887 Ensure buffer release on exception
5a18eb46 is described below
commit 5a18eb46af126e1b4a084a886c8dee6c9a79b85d
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Apr 22 18:42:30 2025 -0400
PROTON-2887 Ensure buffer release on exception
Some code cleanups in the frame parser to ensure the payload buffer is
closed in case of exception to ensure release of a backing buffer that
is being reference counted
---
.../engine/impl/ProtonFrameDecodingHandler.java | 107 +++++++++++++--------
1 file changed, 66 insertions(+), 41 deletions(-)
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
index 09f72836..58d4e013 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
@@ -62,6 +62,11 @@ public class ProtonFrameDecodingHandler implements
EngineHandler, SaslPerformati
*/
public static final int FRAME_SIZE_BYTES = 4;
+ /**
+ * Size of the initial portion of an encoded from before any extended
header portion.
+ */
+ private static final int FRAME_PREAMBLE_BYTES = 8;
+
private final AMQPPerformativeEnvelopePool<IncomingAMQPEnvelope> framePool
= AMQPPerformativeEnvelopePool.incomingEnvelopePool();
private Decoder decoder;
@@ -339,63 +344,83 @@ public class ProtonFrameDecodingHandler implements
EngineHandler, SaslPerformati
@Override
public void parse(EngineHandlerContext context, ProtonBuffer input) {
- int dataOffset = (input.readByte() << 2) & 0x3FF;
- int frameSize = length + FRAME_SIZE_BYTES;
+ final int dataOffset = (input.readByte() << 2) & 0x3FF;
+ final int frameSize = length + FRAME_SIZE_BYTES;
+ final int frameBodySize = frameSize - dataOffset;
validateDataOffset(dataOffset, frameSize);
- int type = input.readByte() & 0xFF;
- short channel = input.readShort();
+ final int type = input.readByte() & 0xFF;
+ final short channel = input.readShort();
- // Skip over the extended header if present (i.e offset > 8)
- if (dataOffset != 8) {
- input.advanceReadOffset(dataOffset - 8);
+ // Skip over the extended header if present (i.e offset >
FRAME_PREAMBLE_SIZE)
+ if (dataOffset != FRAME_PREAMBLE_BYTES) {
+ input.advanceReadOffset(dataOffset - FRAME_PREAMBLE_BYTES);
}
- final int frameBodySize = frameSize - dataOffset;
-
- ProtonBuffer payload = null;
- Object val = null;
-
if (frameBodySize > 0) {
- int startReadIndex = input.getReadOffset();
- val = decoder.readObject(input, decoderState);
-
- // Copy the payload portion of the incoming bytes for now as
the incoming may be
- // from a wrapped pooled buffer and for now we have no way of
retaining or otherwise
- // ensuring that the buffer remains ours. Since we might want
to store received
- // data at a client level and decode later we could end up
losing the data to reuse
- // if it was pooled.
- if (input.isReadable()) {
- int payloadSize = frameBodySize - (input.getReadOffset() -
startReadIndex);
- if (payloadSize > 0) {
- // Payload is now only a view of the bytes from the
input that comprise it.
- payload = input.copy(input.getReadOffset(),
payloadSize, true);
- input.advanceReadOffset(payloadSize);
- }
+ switch (type) {
+ case AMQP_FRAME_TYPE:
+ handleAMQPPerformative(context, channel,
frameBodySize, input);
+ break;
+ case SASL_FRAME_TYPE:
+ handleSASLPerformative(context, input);
+ break;
+ default:
+ throw new
FrameDecodingException(String.format("unknown frame type: %d", type));
}
} else {
- transitionToFrameSizeParsingStage();
- context.fireRead(EmptyEnvelope.INSTANCE);
- return;
+ handleEmptyFrame(context);
}
+ }
+
+ private void handleEmptyFrame(EngineHandlerContext context) {
+ transitionToFrameSizeParsingStage();
+ context.fireRead(EmptyEnvelope.INSTANCE);
+ }
+
+ private void handleAMQPPerformative(EngineHandlerContext context,
short channel, int frameBodySize, ProtonBuffer input) {
+ final int startReadIndex = input.getReadOffset();
+ final Performative performative = decoder.readObject(input,
decoderState, Performative.class);
+
+ // Copy the payload portion of the incoming bytes for now as the
incoming may be from a
+ // wrapped pooled buffer and for now we have no way of retaining
or otherwise ensuring
+ // that the buffer remains ours. Since we might want to store
received data at a client
+ // level and decode later we could end up losing the data to reuse
if it was pooled.
+ if (input.isReadable()) {
+ final ProtonBuffer payload;
+ final int payloadSize = frameBodySize - (input.getReadOffset()
- startReadIndex);
+
+ if (payloadSize > 0) {
+ // The payload buffer is now only a read-only view of the
bytes from the input that comprise it.
+ payload = input.copy(input.getReadOffset(), payloadSize,
true);
+ input.advanceReadOffset(payloadSize);
+ } else {
+ payload = null;
+ }
- if (type == AMQP_FRAME_TYPE) {
- Performative performative = (Performative) val;
- IncomingAMQPEnvelope frame = framePool.take(performative,
channel, payload);
- transitionToFrameSizeParsingStage();
- context.fireRead(frame);
- } else if (type == SASL_FRAME_TYPE) {
- SaslPerformative performative = (SaslPerformative) val;
- SASLEnvelope saslFrame = new SASLEnvelope(performative);
transitionToFrameSizeParsingStage();
- // Ensure we process transition from SASL to AMQP header state
- handleRead(context, saslFrame);
+ try {
+ context.fireRead(framePool.take(performative, channel,
payload));
+ } catch (Exception ex) {
+ if (payload != null) {
+ payload.close();
+ }
+ }
} else {
- throw new FrameDecodingException(String.format("unknown frame
type: %d", type));
+ transitionToFrameSizeParsingStage();
+ context.fireRead(framePool.take(performative, channel, null));
}
}
+ private void handleSASLPerformative(EngineHandlerContext context,
ProtonBuffer input) {
+ final SaslPerformative performative = (SaslPerformative)
decoder.readObject(input, decoderState);;
+
+ transitionToFrameSizeParsingStage();
+ // Ensure we process transition from SASL to AMQP header state
+ handleRead(context, new SASLEnvelope(performative));
+ }
+
private void validateDataOffset(int dataOffset, int frameSize) throws
FrameDecodingException {
if (dataOffset < 8) {
throw new FrameDecodingException(String.format(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]