This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new b5bc7feb PROTON-2809 Ensure codec and buffers handle null Binary data 
payloads
b5bc7feb is described below

commit b5bc7feb091804c497582abd0179d7d06cea144b
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Mar 18 16:19:04 2024 -0400

    PROTON-2809 Ensure codec and buffers handle null Binary data payloads
    
    Ensure that an null Data section encoding is encoded and decoded
    consistently and fix issue in composite buffer decoding empty Binary
    bodies when split at end of buffer.
---
 .../qpid/protonj2/client/impl/MessageSendTest.java | 91 ++++++++++++++++++++
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 98 ++++++++++++++++++++++
 .../test/driver/actions/TransferInjectAction.java  |  4 +-
 .../test/driver/codec/messaging/AmqpValue.java     |  6 +-
 .../protonj2/test/driver/codec/messaging/Data.java | 10 +--
 .../test/driver/codec/primitives/Binary.java       |  4 +-
 .../driver/matchers/types/EncodedDataMatcher.java  | 11 ++-
 .../buffer/impl/ProtonCompositeBufferImpl.java     |  4 +-
 .../codec/encoders/messaging/DataTypeEncoder.java  | 27 +++---
 .../apache/qpid/protonj2/types/messaging/Data.java |  4 +
 .../protonj2/buffer/ProtonAbstractBufferTest.java  | 43 +++++++++-
 .../impl/ProtonFrameDecodingHandlerTest.java       | 90 ++++++++++++++++++++
 12 files changed, 366 insertions(+), 26 deletions(-)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
index 3746ea08..2a3c8291 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
@@ -1287,4 +1287,95 @@ class MessageSendTest extends ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testSendMessageWithNullStringValuePassedToCreate() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.remoteFlow().withLinkCredit(10).queue();
+            peer.expectAttach().respond();  // Open a receiver to ensure 
sender link has processed
+            peer.expectFlow();              // the inbound flow frame we sent 
previously before send.
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort()).openFuture().get();
+
+            Session session = connection.openSession().openFuture().get();
+            SenderOptions options = new 
SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
+            Sender sender = session.openSender("test-qos", options);
+
+            // Gates send on remote flow having been sent and received
+            session.openReceiver("dummy").openFuture().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            
peer.expectTransfer().withMessage().withMessageFormat(0).withValue(null);
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            final Message<String> message = Message.create((String) null);
+            final Tracker tracker = sender.send(message);
+
+            assertNotNull(tracker);
+            assertNotNull(tracker.settlementFuture().isDone());
+            assertNotNull(tracker.settlementFuture().get().settled());
+
+            sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testSendMessageWithNullByteArrayPassedToCreate() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.remoteFlow().withLinkCredit(10).queue();
+            peer.expectAttach().respond();  // Open a receiver to ensure 
sender link has processed
+            peer.expectFlow();              // the inbound flow frame we sent 
previously before send.
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort()).openFuture().get();
+
+            Session session = connection.openSession().openFuture().get();
+            SenderOptions options = new 
SenderOptions().deliveryMode(DeliveryMode.AT_MOST_ONCE);
+            Sender sender = session.openSender("test-qos", options);
+
+            // Gates send on remote flow having been sent and received
+            session.openReceiver("dummy").openFuture().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            
peer.expectTransfer().withMessage().withMessageFormat(0).withData(null);
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            final Message<byte[]> message = Message.create((byte[]) null);
+            final Tracker tracker = sender.send(message);
+
+            assertNotNull(tracker);
+            assertNotNull(tracker.settlementFuture().isDone());
+            assertNotNull(tracker.settlementFuture().get().settled());
+
+            sender.closeAsync().get(10, TimeUnit.SECONDS);
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index d4085ffa..3bcc3651 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -3152,4 +3152,102 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testReceiveMessageWithNullWrappedInAmqpValue() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withMessage().withBody().withValue((String) 
null)
+                                 .also()
+                                 .splitWrite(true)
+                                 .afterDelay(25)
+                                 .queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            final Client container = Client.create();
+            final ConnectionOptions options = new ConnectionOptions();
+            final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            final Receiver receiver = connection.openReceiver("test-queue");
+            final Delivery delivery = receiver.receive();
+            final Message<String> message = delivery.message();
+
+            assertNull(message.body());
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            assertNotNull(delivery);
+
+            receiver.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testReceiveMessageWithNullWrappedInDataSection() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withMessage().withBody().withData((byte[]) 
null)
+                                 .also()
+                                 .splitWrite(true)
+                                 .afterDelay(25)
+                                 .queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            final Client container = Client.create();
+            final ConnectionOptions options = new ConnectionOptions();
+            final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            final Receiver receiver = connection.openReceiver("test-queue");
+            final Delivery delivery = receiver.receive();
+            final Message<byte[]> message = delivery.message();
+
+            assertNull(message.body());
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            assertNotNull(delivery);
+
+            receiver.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
index e434a486..f4d561a5 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
@@ -517,7 +517,7 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
         }
 
         public BodySectionBuilder withValue(byte[] body) {
-            TransferInjectAction.this.body = new AmqpValue(new Binary(body));
+            TransferInjectAction.this.body = new AmqpValue(body == null ? null 
: new Binary(body));
             return this;
         }
 
@@ -527,7 +527,7 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
         }
 
         public BodySectionBuilder withData(byte[] body) {
-            TransferInjectAction.this.body = new Data(new Binary(body));
+            TransferInjectAction.this.body = new Data(body == null ? null : 
new Binary(body));
             return this;
         }
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java
index 8cfe2513..0f5f24f3 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/AmqpValue.java
@@ -25,7 +25,11 @@ public class AmqpValue implements DescribedType {
     public static final UnsignedLong DESCRIPTOR_CODE = 
UnsignedLong.valueOf(0x0000000000000077L);
     public static final Symbol DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:amqp-value:*");
 
-    private Object described;
+    private final Object described;
+
+    public AmqpValue() {
+        this.described = null;
+    }
 
     public AmqpValue(Object described) {
         this.described = described;
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java
index f33f9af7..4368e99b 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/messaging/Data.java
@@ -26,13 +26,13 @@ public class Data implements DescribedType {
     public static final UnsignedLong DESCRIPTOR_CODE = 
UnsignedLong.valueOf(0x0000000000000075L);
     public static final Symbol DESCRIPTOR_SYMBOL = 
Symbol.valueOf("amqp:data:binary");
 
-    private Binary described;
+    private final Binary described;
 
-    public Data(Binary described) {
-        if (described == null) {
-            throw new IllegalArgumentException("provided Binary must not be 
null");
-        }
+    public Data() {
+        this.described = null;
+    }
 
+    public Data(Binary described) {
         this.described = described;
     }
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java
index f75e783e..15737c85 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/Binary.java
@@ -29,11 +29,11 @@ public final class Binary {
     }
 
     public Binary(final byte[] data) {
-        this(data, 0, data.length);
+        this(data, 0, data != null ? data.length : 0);
     }
 
     public Binary(final byte[] data, final int offset, final int length) {
-        this.buffer = Arrays.copyOfRange(data, offset, offset + length);
+        this.buffer = data != null ? Arrays.copyOfRange(data, offset, offset + 
length) : null;
     }
 
     public Binary copy() {
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java
index 0a4861f1..70ac3b29 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedDataMatcher.java
@@ -18,6 +18,8 @@
  */
 package org.apache.qpid.protonj2.test.driver.matchers.types;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.protonj2.test.driver.codec.messaging.AmqpValue;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.Data;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
@@ -35,7 +37,7 @@ public class EncodedDataMatcher extends 
EncodedAmqpTypeMatcher {
      *        the value that is expected to be IN the received {@link Data}
      */
     public EncodedDataMatcher(byte[] expectedValue) {
-        this(new Binary(expectedValue), false);
+        this(expectedValue != null ? new Binary(expectedValue) : null, false);
     }
 
     /**
@@ -54,7 +56,7 @@ public class EncodedDataMatcher extends 
EncodedAmqpTypeMatcher {
      *        consuming the {@link AmqpValue}
      */
     public EncodedDataMatcher(byte[] expectedValue, boolean 
permitTrailingBytes) {
-        this(new Binary(expectedValue), permitTrailingBytes);
+        this(expectedValue != null ? new Binary(expectedValue) : null, 
permitTrailingBytes);
     }
 
     /**
@@ -68,6 +70,11 @@ public class EncodedDataMatcher extends 
EncodedAmqpTypeMatcher {
         super(DESCRIPTOR_SYMBOL, DESCRIPTOR_CODE, expectedValue, 
permitTrailingBytes);
     }
 
+    @Override
+    protected boolean matchesSafely(ByteBuffer receivedBinary) {
+        return super.matchesSafely(receivedBinary);
+    }
+
     @Override
     public void describeTo(Description description) {
         description.appendText("a Binary encoding of a Data that wraps a 
Binary containing: ").appendValue(getExpectedValue());
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java
 
b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java
index 0f567fb0..8035e7af 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/buffer/impl/ProtonCompositeBufferImpl.java
@@ -351,7 +351,7 @@ public final class ProtonCompositeBufferImpl extends 
SharedResource<ProtonBuffer
             throw generateIndexOutOfBounds(offset + length, false);
         }
 
-        int lastAccessedChunk = findChunkWithIndex(offset);
+        int lastAccessedChunk = length > 0 ? findChunkWithIndex(offset) : 0;
 
         while (length > 0) {
             final ProtonBuffer buffer = buffers[lastAccessedChunk];
@@ -384,7 +384,7 @@ public final class ProtonCompositeBufferImpl extends 
SharedResource<ProtonBuffer
             throw generateIndexOutOfBounds(offset + length, false);
         }
 
-        int lastAccessedChunk = findChunkWithIndex(offset);
+        int lastAccessedChunk = length > 0 ? findChunkWithIndex(offset) : 0;
 
         while (length > 0) {
             final ProtonBuffer buffer = buffers[lastAccessedChunk];
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
 
b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
index fe064a27..5af13db9 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/encoders/messaging/DataTypeEncoder.java
@@ -52,19 +52,24 @@ public final class DataTypeEncoder extends 
AbstractDescribedTypeEncoder<Data> {
     public void writeType(ProtonBuffer buffer, EncoderState state, Data value) 
{
         buffer.writeBytes(DATA_PREAMBLE);
 
-        final int dataLength = value.getDataLength();
-
-        if (dataLength > 255) {
-            buffer.ensureWritable(dataLength + Long.BYTES);
-            buffer.writeByte(EncodingCodes.VBIN32);
-            buffer.writeInt(dataLength);
+        if (value.hasBinary()) {
+            final int dataLength = value.getDataLength();
+
+            if (dataLength > 255) {
+                buffer.ensureWritable(dataLength + Long.BYTES);
+                buffer.writeByte(EncodingCodes.VBIN32);
+                buffer.writeInt(dataLength);
+            } else {
+                buffer.ensureWritable(dataLength + Short.BYTES);
+                buffer.writeByte(EncodingCodes.VBIN8);
+                buffer.writeByte((byte) dataLength);
+            }
+
+            value.copyTo(buffer);
         } else {
-            buffer.ensureWritable(dataLength + Short.BYTES);
-            buffer.writeByte(EncodingCodes.VBIN8);
-            buffer.writeByte((byte) dataLength);
+            buffer.ensureWritable(1);
+            buffer.writeByte(EncodingCodes.NULL);
         }
-
-        value.copyTo(buffer);
     }
 
     @Override
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java 
b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
index 81b7f098..19e7b8fa 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/types/messaging/Data.java
@@ -54,6 +54,10 @@ public final class Data implements Section<byte[]> {
         return new Data(buffer == null ? null : buffer.copy(true));
     }
 
+    public boolean hasBinary() {
+        return buffer != null;
+    }
+
     public Binary getBinary() {
         if (cachedBinary != null || buffer == null) {
             return cachedBinary;
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java
index 50764d72..c390123e 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/buffer/ProtonAbstractBufferTest.java
@@ -21,6 +21,7 @@ import static java.nio.file.StandardOpenOption.CREATE;
 import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -474,6 +475,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testWriteBytesHeapByteBufferMustThrowIfCannotBeExpanded() {
         // With zero offsets
@@ -528,6 +530,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testWriteBytesDirectByteBufferMustThrowIfCannotBeExpanded() {
         // With zero offsets
@@ -580,6 +583,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testWriteBytesByteArrayMustThrowIfCannotBeExpanded() {
         // Starting at offsets zero.
@@ -630,6 +634,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testWriteBytesByteArrayWithOffsetMustThrowIfCannotBeExpanded() 
{
         // Starting at offsets zero.
@@ -682,6 +687,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testWriteBytesBufferMustThrowIfCannotBeExpanded() {
         // Starting at offsets zero.
@@ -2062,6 +2068,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testEnsureWritableCanGrowBeyondImplicitCapacityLimit() {
         try (ProtonBufferAllocator allocator = createTestCaseAllocator(); 
ProtonBuffer buf = allocator.allocate(8).implicitGrowthLimit(8)) {
@@ -2077,6 +2084,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testWritesMustThrowIfSizeWouldGoBeyondImplicitCapacityLimit() {
         try (ProtonBufferAllocator allocator = createTestCaseAllocator()) {
@@ -2537,6 +2545,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testAllocatingOnClosedAllocatorMustThrow() {
         ProtonBufferAllocator allocator = createTestCaseAllocator();
@@ -2982,9 +2991,39 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @Test
+    public void testCopyIntoOnEmptyBufferFromFullyReadBuffer() {
+        try (ProtonBufferAllocator allocator = createTestCaseAllocator();
+             ProtonBuffer source = allocator.allocate(8);
+             ProtonBuffer target = allocator.allocate(0)) {
+
+            source.writeLong(0xFFFFFFFFL);
+            source.readLong();
+
+            assertDoesNotThrow(() -> source.copyInto(source.getReadOffset(), 
target, 0, 0));
+        }
+    }
+
+    @Test
+    public void testCopyIntoByteBufferOnEmptyBufferFromFullyReadBuffer() {
+        try (ProtonBufferAllocator allocator = createTestCaseAllocator();
+             ProtonBuffer source = allocator.allocate(8)) {
+
+            final ByteBuffer target = ByteBuffer.allocate(0);
+
+            source.writeLong(0xFFFFFFFFL);
+            source.readLong();
+
+            assertDoesNotThrow(() -> source.copyInto(source.getReadOffset(), 
target, 0, 0));
+        }
+    }
+
+    @SuppressWarnings("resource")
     @Test
     public void testReadOnlyBuffersCannotChangeWriteOffset() {
-        try (ProtonBufferAllocator allocator = createTestCaseAllocator(); 
ProtonBuffer buf = allocator.allocate(8).convertToReadOnly()) {
+        try (ProtonBufferAllocator allocator = createTestCaseAllocator();
+             ProtonBuffer buf = allocator.allocate(8).convertToReadOnly()) {
+
             assertThrows(ProtonBufferReadOnlyException.class, () -> 
buf.setWriteOffset(0));
             assertThrows(ProtonBufferReadOnlyException.class, () -> 
buf.setWriteOffset(4));
 
@@ -4895,6 +4934,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testIteratingComponentOnClosedBufferMustThrow() {
         try (ProtonBufferAllocator allocator = createTestCaseAllocator()) {
@@ -5518,6 +5558,7 @@ public abstract class ProtonAbstractBufferTest {
         }
     }
 
+    @SuppressWarnings("resource")
     protected static void verifyInaccessible(ProtonBuffer buf) {
         verifyReadInaccessible(buf);
 
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
index a6c60740..c4650b09 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
@@ -30,6 +30,9 @@ import java.util.List;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator;
+import org.apache.qpid.protonj2.codec.CodecFactory;
+import org.apache.qpid.protonj2.codec.Decoder;
+import org.apache.qpid.protonj2.codec.DecoderState;
 import org.apache.qpid.protonj2.engine.EmptyEnvelope;
 import org.apache.qpid.protonj2.engine.Engine;
 import org.apache.qpid.protonj2.engine.EngineHandlerContext;
@@ -39,6 +42,11 @@ import 
org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
 import org.apache.qpid.protonj2.engine.util.FrameReadSinkTransportHandler;
 import org.apache.qpid.protonj2.engine.util.FrameRecordingTransportHandler;
 import org.apache.qpid.protonj2.engine.util.FrameWriteSinkTransportHandler;
+import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
+import org.apache.qpid.protonj2.types.messaging.Header;
+import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
+import org.apache.qpid.protonj2.types.messaging.Properties;
+import org.apache.qpid.protonj2.types.messaging.Section;
 import org.apache.qpid.protonj2.types.transport.AMQPHeader;
 import org.apache.qpid.protonj2.types.transport.Open;
 import org.apache.qpid.protonj2.types.transport.Transfer;
@@ -833,6 +841,88 @@ public class ProtonFrameDecodingHandlerTest {
         assertFalse(open.hasProperties());
     }
 
+    @Test
+    public void testDecodeOfSplitFramedMessage() throws Exception {
+        final String capture1 = "00 00 01 4c 02 00 00 00 00 53 14 c0 1d 0b 43 
43 " +
+                                "a0 10 cf 8c f4 f8 93 5d b6 47 a2 19 3f 87 34 
6f " +
+                                "03 97 43 40 42 40 40 40 40 41";
+        final String capture2 = "00 53 70 c0 0b 05 40 40 70 48 19 08 00 40 52 
06 " +
+                                "00 53 71 c1 24 02 a3 10 78 2d 6f 70 74 2d 6c 
6f " +
+                                "63 6b 2d 74 6f 6b 65 6e 98 f8 f4 8c cf 5d 93 
47 " +
+                                "b6 a2 19 3f 87 34 6f 03 97 00 53 72 c1 9b 0a 
a3 " +
+                                "13 78 2d 6f 70 74 2d 65 6e 71 75 65 75 65 64 
2d " +
+                                "74 69 6d 65 83 00 00 01 8e 42 cd 59 63 a3 15 
78 " +
+                                "2d 6f 70 74 2d 73 65 71 75 65 6e 63 65 2d 6e 
75 " +
+                                "6d 62 65 72 81 00 00 00 00 00 00 07 2d a3 12 
78 " +
+                                "2d 6f 70 74 2d 6c 6f 63 6b 65 64 2d 75 6e 74 
69 " +
+                                "6c 83 00 00 01 8e 43 db 33 f3 a3 1d 78 2d 6f 
70 " +
+                                "74 2d 65 6e 71 75 65 75 65 2d 73 65 71 75 65 
6e " +
+                                "63 65 2d 6e 75 6d 62 65 72 81 00 00 00 00 00 
00 " +
+                                "07 2d a3 13 78 2d 6f 70 74 2d 6d 65 73 73 61 
67 " +
+                                "65 2d 73 74 61 74 65 54 00 00 53 73 c0 3f 0d 
a1 " +
+                                "20 39 34 39 37 36 34 61 61 38 30 37 62 34 30 
37 " +
+                                "38 39 39 64 32 35 66 61 65 61 63 65 36 61 38 
34 " +
+                                "65 40 40 40 40 40 40 40 83 00 00 01 8e 8a e6 
61 " +
+                                "63 83 00 00 01 8e 42 cd 59 63 40 40 40 00 53 
75 " +
+                                "a0 00";
+
+        final byte[] packet1 = convertCaptureToByteArray(capture1);
+        final byte[] packet2 = convertCaptureToByteArray(capture2);
+
+        ArgumentCaptor<IncomingAMQPEnvelope> argument = 
ArgumentCaptor.forClass(IncomingAMQPEnvelope.class);
+
+        ProtonFrameDecodingHandler handler = createFrameDecoder();
+        ProtonEngineHandlerContext context = 
Mockito.mock(ProtonEngineHandlerContext.class);
+
+        handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
+
+        final ProtonBuffer buffer1 = 
ProtonBufferAllocator.defaultAllocator().copy(packet1);
+        final ProtonBuffer buffer2 = 
ProtonBufferAllocator.defaultAllocator().copy(packet2);
+
+        handler.handleRead(context, buffer1);
+        handler.handleRead(context, buffer2);
+
+        Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        
Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
+        Mockito.verify(context, times(1)).fireRead(argument.capture());
+        Mockito.verifyNoMoreInteractions(context);
+
+        List<IncomingAMQPEnvelope> arguments = argument.getAllValues();
+
+        assertNotNull(arguments.get(0));
+        assertTrue(arguments.get(0).getBody() instanceof Transfer);
+        assertNotNull(arguments.get(0).getPayload());
+        assertTrue(arguments.get(0).getPayload().isReadable());
+        assertEquals(290, arguments.get(0).getPayload().getReadableBytes());
+
+        ProtonBuffer buffer = arguments.get(0).getPayload();
+        Decoder decoder = CodecFactory.getDefaultDecoder();
+        DecoderState decoderState = decoder.getCachedDecoderState();
+
+        final Header header = (Header) decoder.readObject(buffer, 
decoderState);
+        final DeliveryAnnotations deliveryAnnotations = (DeliveryAnnotations) 
decoder.readObject(buffer, decoderState);
+        final MessageAnnotations messageAnnotations = (MessageAnnotations) 
decoder.readObject(buffer, decoderState);
+        final Properties properties = (Properties) decoder.readObject(buffer, 
decoderState);
+        final Section<?> body = (Section<?>) decoder.readObject(buffer, 
decoderState);
+
+        assertNotNull(header);
+        assertNotNull(deliveryAnnotations);
+        assertNotNull(messageAnnotations);
+        assertNotNull(properties);
+        assertNotNull(body);
+    }
+
+    private byte[] convertCaptureToByteArray(String capture) throws Exception {
+        final String[] hexStrings = capture.split(" ");
+        final byte[] capturedBytes = new byte[hexStrings.length];
+
+        for(int i = 0; i < hexStrings.length; ++i) {
+            capturedBytes[i] = (byte) Integer.parseUnsignedInt(hexStrings[i], 
16);
+        }
+
+        return capturedBytes;
+    }
+
     private ProtonFrameDecodingHandler createFrameDecoder() {
         ProtonEngineConfiguration configuration = 
Mockito.mock(ProtonEngineConfiguration.class);
         
Mockito.when(configuration.getInboundMaxFrameSize()).thenReturn(Long.valueOf(65535));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to