This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new b5ac05d7 PROTON-2795 Ensure message format is set on first transfer 
frame
b5ac05d7 is described below

commit b5ac05d77ce697d0290643709bb2f5d718a1a673
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri Feb 23 17:54:55 2024 -0500

    PROTON-2795 Ensure message format is set on first transfer frame
    
    Fix code to enforce send of message format on first transfer and omit
    from subsequent frames as it is not needed after the first Transfer
    frame.
---
 .../qpid/protonj2/client/impl/MessageSendTest.java |  2 +-
 .../engine/impl/ProtonSessionOutgoingWindow.java   |  2 +-
 .../protonj2/engine/impl/ProtonSenderTest.java     | 52 +++++++++++++++++++++-
 3 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
index c893e3fb..73aa64d4 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
@@ -107,7 +107,7 @@ class MessageSendTest extends ImperativeClientTestCase {
             payloadMatcher.setMessageContentMatcher(bodyMatcher);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            peer.expectTransfer().withPayload(payloadMatcher).accept();
+            
peer.expectTransfer().withMessageFormat(0).withPayload(payloadMatcher).accept();
             peer.expectDetach().respond();
             peer.expectClose().respond();
 
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
index c25e5b44..35729bd8 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
@@ -267,7 +267,7 @@ public class ProtonSessionOutgoingWindow {
 
         try {
             cachedTransfer.setDeliveryId(delivery.getDeliveryId());
-            if (delivery.getMessageFormat() != 0) {
+            if (delivery.getTransferCount() == 0) {
                 cachedTransfer.setMessageFormat(delivery.getMessageFormat());
             } else {
                 cachedTransfer.clearMessageFormat();
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
index 064370e2..22843ed2 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
@@ -57,6 +57,7 @@ import 
org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
 import org.apache.qpid.protonj2.logging.ProtonLogger;
 import org.apache.qpid.protonj2.logging.ProtonLoggerFactory;
 import org.apache.qpid.protonj2.test.driver.ProtonTestConnector;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher;
@@ -1267,6 +1268,53 @@ public class ProtonSenderTest extends 
ProtonEngineTestSupport {
         assertNull(failure);
     }
 
+    @Test
+    public void testSendTransferWithDefaultMessageFormat() throws Exception {
+        Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        final byte [] payloadBuffer = new byte[] {0, 1, 2, 3, 4};
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().respond().withContainerId("driver");
+        peer.expectBegin().respond();
+        peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
+        peer.remoteFlow().withDeliveryCount(0)
+                         .withLinkCredit(10)
+                         .withIncomingWindow(1024)
+                         .withOutgoingWindow(10)
+                         .withNextIncomingId(0)
+                         .withNextOutgoingId(1).queue();
+        peer.expectTransfer().withMessageFormat(0).withPayload(payloadBuffer);
+        peer.expectDetach().withHandle(0).respond();
+
+        Connection connection = engine.start();
+
+        connection.open();
+        Session session = connection.session();
+        session.open();
+
+        ProtonBuffer payload = 
ProtonBufferAllocator.defaultAllocator().copy(payloadBuffer);
+
+        Sender sender = session.sender("sender-1");
+
+        assertFalse(sender.isSendable());
+
+        sender.creditStateUpdateHandler(handler -> {
+            if (handler.isSendable()) {
+                handler.next().setTag(new byte[] {0}).writeBytes(payload);
+            }
+        });
+
+        sender.open();
+        sender.close();
+
+        peer.waitForScriptToComplete();
+
+        assertNull(failure);
+    }
+
     @Test
     public void 
testSenderSignalsDeliveryUpdatedOnSettledThenSettleFromLinkAPI() throws 
Exception {
         doTestSenderSignalsDeliveryUpdatedOnSettled(true);
@@ -3342,7 +3390,7 @@ public class ProtonSenderTest extends 
ProtonEngineTestSupport {
         peer.expectTransfer().withHandle(0)
                              .withState(nullValue())
                              .withDeliveryId(0)
-                             .withMessageFormat(42)
+                             .withMessageFormat((UnsignedInteger) null)
                              .withAborted(anyOf(nullValue(), is(false)))
                              .withSettled(false)
                              .withMore(anyOf(nullValue(), is(false)))
@@ -3424,7 +3472,7 @@ public class ProtonSenderTest extends 
ProtonEngineTestSupport {
         peer.expectTransfer().withHandle(0)
                              .withState().accepted()
                              .withDeliveryId(0)
-                             .withMessageFormat(42)
+                             .withMessageFormat((UnsignedInteger) null)
                              .withAborted(anyOf(nullValue(), is(false)))
                              .withSettled(settle)
                              .withMore(anyOf(nullValue(), is(false)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to