This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new b5ac05d7 PROTON-2795 Ensure message format is set on first transfer
frame
b5ac05d7 is described below
commit b5ac05d77ce697d0290643709bb2f5d718a1a673
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Feb 23 17:54:55 2024 -0500
PROTON-2795 Ensure message format is set on first transfer frame
Fix code to enforce send of message format on first transfer and omit
from subsequent frames as it is not needed after the first Transfer
frame.
---
.../qpid/protonj2/client/impl/MessageSendTest.java | 2 +-
.../engine/impl/ProtonSessionOutgoingWindow.java | 2 +-
.../protonj2/engine/impl/ProtonSenderTest.java | 52 +++++++++++++++++++++-
3 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
index c893e3fb..73aa64d4 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
@@ -107,7 +107,7 @@ class MessageSendTest extends ImperativeClientTestCase {
payloadMatcher.setMessageContentMatcher(bodyMatcher);
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
- peer.expectTransfer().withPayload(payloadMatcher).accept();
+
peer.expectTransfer().withMessageFormat(0).withPayload(payloadMatcher).accept();
peer.expectDetach().respond();
peer.expectClose().respond();
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
index c25e5b44..35729bd8 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java
@@ -267,7 +267,7 @@ public class ProtonSessionOutgoingWindow {
try {
cachedTransfer.setDeliveryId(delivery.getDeliveryId());
- if (delivery.getMessageFormat() != 0) {
+ if (delivery.getTransferCount() == 0) {
cachedTransfer.setMessageFormat(delivery.getMessageFormat());
} else {
cachedTransfer.clearMessageFormat();
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
index 064370e2..22843ed2 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java
@@ -57,6 +57,7 @@ import
org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.logging.ProtonLogger;
import org.apache.qpid.protonj2.logging.ProtonLoggerFactory;
import org.apache.qpid.protonj2.test.driver.ProtonTestConnector;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher;
@@ -1267,6 +1268,53 @@ public class ProtonSenderTest extends
ProtonEngineTestSupport {
assertNull(failure);
}
+ @Test
+ public void testSendTransferWithDefaultMessageFormat() throws Exception {
+ Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ final byte [] payloadBuffer = new byte[] {0, 1, 2, 3, 4};
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond().withContainerId("driver");
+ peer.expectBegin().respond();
+ peer.expectAttach().withRole(Role.SENDER.getValue()).respond();
+ peer.remoteFlow().withDeliveryCount(0)
+ .withLinkCredit(10)
+ .withIncomingWindow(1024)
+ .withOutgoingWindow(10)
+ .withNextIncomingId(0)
+ .withNextOutgoingId(1).queue();
+ peer.expectTransfer().withMessageFormat(0).withPayload(payloadBuffer);
+ peer.expectDetach().withHandle(0).respond();
+
+ Connection connection = engine.start();
+
+ connection.open();
+ Session session = connection.session();
+ session.open();
+
+ ProtonBuffer payload =
ProtonBufferAllocator.defaultAllocator().copy(payloadBuffer);
+
+ Sender sender = session.sender("sender-1");
+
+ assertFalse(sender.isSendable());
+
+ sender.creditStateUpdateHandler(handler -> {
+ if (handler.isSendable()) {
+ handler.next().setTag(new byte[] {0}).writeBytes(payload);
+ }
+ });
+
+ sender.open();
+ sender.close();
+
+ peer.waitForScriptToComplete();
+
+ assertNull(failure);
+ }
+
@Test
public void
testSenderSignalsDeliveryUpdatedOnSettledThenSettleFromLinkAPI() throws
Exception {
doTestSenderSignalsDeliveryUpdatedOnSettled(true);
@@ -3342,7 +3390,7 @@ public class ProtonSenderTest extends
ProtonEngineTestSupport {
peer.expectTransfer().withHandle(0)
.withState(nullValue())
.withDeliveryId(0)
- .withMessageFormat(42)
+ .withMessageFormat((UnsignedInteger) null)
.withAborted(anyOf(nullValue(), is(false)))
.withSettled(false)
.withMore(anyOf(nullValue(), is(false)))
@@ -3424,7 +3472,7 @@ public class ProtonSenderTest extends
ProtonEngineTestSupport {
peer.expectTransfer().withHandle(0)
.withState().accepted()
.withDeliveryId(0)
- .withMessageFormat(42)
+ .withMessageFormat((UnsignedInteger) null)
.withAborted(anyOf(nullValue(), is(false)))
.withSettled(settle)
.withMore(anyOf(nullValue(), is(false)))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]