This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 7bce1bfd PROTON-2794 Resolve transfer ID wrapping issue in incoming
window
7bce1bfd is described below
commit 7bce1bfd4be21ee5e894c57eb84d3791ee4f5caf
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Mar 5 14:17:02 2024 -0500
PROTON-2794 Resolve transfer ID wrapping issue in incoming window
Ensure that transfer ID wraps at max uint value and adds some tests
---
pom.xml | 5 +
protonj2/pom.xml | 5 +
.../engine/impl/ProtonSessionIncomingWindow.java | 36 ++-
.../protonj2/engine/impl/ProtonReceiverTest.java | 66 +++++
.../impl/ProtonSessionIncomingWindowTest.java | 266 +++++++++++++++++++++
.../impl/ProtonSessionOutgoingWindowTest.java | 81 +++++++
6 files changed, 440 insertions(+), 19 deletions(-)
diff --git a/pom.xml b/pom.xml
index 3f3e31ae..71424e33 100644
--- a/pom.xml
+++ b/pom.xml
@@ -249,6 +249,11 @@
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>${mockito.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/protonj2/pom.xml b/protonj2/pom.xml
index 986d2edc..97282731 100644
--- a/protonj2/pom.xml
+++ b/protonj2/pom.xml
@@ -84,6 +84,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index 59bf6be5..d4acc17e 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -37,7 +37,7 @@ import org.apache.qpid.protonj2.types.transport.Transfer;
*/
public class ProtonSessionIncomingWindow {
- private static final long DEFAULT_WINDOW_SIZE = Integer.MAX_VALUE; //
biggest legal value
+ private static final int DEFAULT_WINDOW_SIZE = Integer.MAX_VALUE; //
biggest legal value
private final ProtonSession session;
private final ProtonEngine engine;
@@ -46,16 +46,16 @@ public class ProtonSessionIncomingWindow {
private int incomingCapacity = 0;
// Computed incoming window based on the incoming capacity minus bytes not
yet read from deliveries.
- private long incomingWindow = 0;
+ private int incomingWindow = 0;
// Tracks the next expected incoming transfer ID from the remote
- private long nextIncomingId = 0;
+ private int nextIncomingId = 0;
// Tracks the most recent delivery Id for validation against the next
incoming delivery
private SequenceNumber lastDeliveryid;
- private long maxFrameSize;
- private long incomingBytes;
+ private int maxFrameSize;
+ private int incomingBytes;
private UnsettledMap<ProtonIncomingDelivery> unsettled =
new UnsettledMap<>(ProtonIncomingDelivery::getDeliveryIdInt);
@@ -63,7 +63,7 @@ public class ProtonSessionIncomingWindow {
public ProtonSessionIncomingWindow(ProtonSession session) {
this.session = session;
this.engine = session.getConnection().getEngine();
- this.maxFrameSize = session.getConnection().getMaxFrameSize();
+ this.maxFrameSize = (int) session.getConnection().getMaxFrameSize();
}
public void setIncomingCapacity(int incomingCapacity) {
@@ -75,11 +75,10 @@ public class ProtonSessionIncomingWindow {
}
public int getRemainingIncomingCapacity() {
- // TODO: This is linked to below update of capacity which also needs
more attention.
- if (incomingCapacity <= 0 || maxFrameSize ==
UnsignedInteger.MAX_VALUE.longValue()) {
- return (int) DEFAULT_WINDOW_SIZE;
+ if (incomingCapacity <= 0 || maxFrameSize ==
UnsignedInteger.MAX_VALUE.intValue()) {
+ return DEFAULT_WINDOW_SIZE;
} else {
- return (int) (incomingCapacity - incomingBytes);
+ return incomingCapacity - incomingBytes;
}
}
@@ -93,7 +92,7 @@ public class ProtonSessionIncomingWindow {
*/
Begin configureOutbound(Begin begin) {
// Update as it might have changed if session created before
connection open() called.
- this.maxFrameSize = session.getConnection().getMaxFrameSize();
+ this.maxFrameSize = (int) session.getConnection().getMaxFrameSize();
return begin.setIncomingWindow(updateIncomingWindow());
}
@@ -108,7 +107,7 @@ public class ProtonSessionIncomingWindow {
*/
Begin handleBegin(Begin begin) {
if (begin.hasNextOutgoingId()) {
- this.nextIncomingId = begin.getNextOutgoingId();
+ this.nextIncomingId =
UnsignedInteger.valueOf(begin.getNextOutgoingId()).intValue();
}
return begin;
@@ -137,7 +136,8 @@ public class ProtonSessionIncomingWindow {
incomingWindow--;
nextIncomingId++;
- ProtonIncomingDelivery delivery = link.remoteTransfer(transfer,
payload);
+ final ProtonIncomingDelivery delivery = link.remoteTransfer(transfer,
payload);
+
if (!delivery.isSettled() && !delivery.isRemotelySettled() &&
delivery.isFirstTransfer()) {
unsettled.put((int) delivery.getDeliveryId(), delivery);
}
@@ -186,12 +186,10 @@ public class ProtonSessionIncomingWindow {
}
long updateIncomingWindow() {
- // TODO - need to revisit this logic and decide on sane cutoff for
capacity restriction.
if (incomingCapacity <= 0 || maxFrameSize ==
UnsignedInteger.MAX_VALUE.longValue()) {
incomingWindow = DEFAULT_WINDOW_SIZE;
} else {
- // TODO - incomingWindow = Integer.divideUnsigned(incomingCapacity
- incomingBytes, maxFrameSize);
- incomingWindow = (incomingCapacity - incomingBytes) / maxFrameSize;
+ incomingWindow = Integer.divideUnsigned(incomingCapacity -
incomingBytes, maxFrameSize);
}
return incomingWindow;
@@ -205,14 +203,14 @@ public class ProtonSessionIncomingWindow {
//----- Access to internal state useful for tests
public long getIncomingBytes() {
- return incomingBytes;
+ return Integer.toUnsignedLong(incomingBytes);
}
- public long getNextIncomingId() {
+ public int getNextIncomingId() {
return nextIncomingId;
}
- public long getIncomingWindow() {
+ public int getIncomingWindow() {
return incomingWindow;
}
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
index 7b00f95b..56dcff16 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
@@ -4654,4 +4654,70 @@ public class ProtonReceiverTest extends
ProtonEngineTestSupport {
peer.waitForScriptToComplete();
assertNull(failure);
}
+
+ @Test
+ public void testReceiverUpdatesFlowOutgoingIdAfterOverflow() {
+ Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ final byte[] payload = new byte[] { 1 };
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond().withContainerId("driver");
+
peer.expectBegin().respond().withNextOutgoingId(UnsignedInteger.MAX_VALUE.intValue());
+ peer.expectAttach().respond();
+ peer.expectFlow().withLinkCredit(1)
+
.withNextIncomingId(UnsignedInteger.MAX_VALUE.intValue());
+ peer.remoteTransfer().withDeliveryId(1)
+ .withDeliveryTag(new byte[] {1})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(1)
+ .withSettled(true)
+ .withState().accepted();
+
+ Connection connection = engine.start().open();
+ Session session = connection.session().open();
+ Receiver receiver = session.receiver("receiver");
+ receiver.deliveryReadHandler((delivery) -> {
+ delivery.disposition(Accepted.getInstance(), true);
+ });
+
+ receiver.addCredit(1);
+ receiver.open();
+
+ peer.waitForScriptToComplete();
+ peer.expectFlow().withLinkCredit(1)
+ .withNextIncomingId(0);
+ peer.remoteTransfer().withDeliveryId(2)
+ .withDeliveryTag(new byte[] {1})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(2)
+ .withSettled(true)
+ .withState().accepted();
+
+ receiver.addCredit(1);
+
+ peer.waitForScriptToComplete();
+ peer.expectFlow().withLinkCredit(1)
+ .withNextIncomingId(1);
+
+ receiver.addCredit(1);
+
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+
+ receiver.close();
+ session.close();
+ connection.close();
+
+ // Check post conditions and done.
+ peer.waitForScriptToComplete();
+ assertNull(failure);
+ }
}
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindowTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindowTest.java
new file mode 100644
index 00000000..db530a5a
--- /dev/null
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindowTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.impl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protonj2.buffer.ProtonBuffer;
+import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator;
+import org.apache.qpid.protonj2.types.UnsignedInteger;
+import org.apache.qpid.protonj2.types.transport.Begin;
+import org.apache.qpid.protonj2.types.transport.Transfer;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test the proton incoming window class for specification compliant behavior
+ */
+@ExtendWith(MockitoExtension.class)
+class ProtonSessionIncomingWindowTest {
+
+ private static final int DEFAULT_SESSION_CAPACITY = 100_000;
+
+ private static final int DEFAULT_MAX_FRAME_SIZE = 100_000;
+
+ @Mock
+ ProtonConnection connection;
+
+ @Mock
+ ProtonSession session;
+
+ @Mock
+ ProtonReceiver receiver;
+
+ @Mock
+ ProtonIncomingDelivery delivery;
+
+ ProtonSessionIncomingWindow window;
+
+ final Begin remoteBegin = new Begin();
+ final Begin localBegin = new Begin();
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ when(session.getConnection()).thenReturn(connection);
+ when(session.getConnection().getMaxFrameSize()).thenReturn((long)
DEFAULT_MAX_FRAME_SIZE);
+
+ window = new ProtonSessionIncomingWindow(session);
+ }
+
+ @Test
+ public void testStateAfterBasicBeginExchange() {
+ localBegin.setNextOutgoingId(0);
+ remoteBegin.setNextOutgoingId(UnsignedInteger.MAX_VALUE.intValue());
+
+ window.configureOutbound(localBegin);
+ window.handleBegin(remoteBegin);
+
+ assertEquals(UnsignedInteger.MAX_VALUE.intValue(),
window.getNextIncomingId());
+ assertEquals(0, window.getIncomingCapacity());
+ assertEquals(Integer.MAX_VALUE, window.getRemainingIncomingCapacity());
+
+ window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY);
+
+ assertEquals(DEFAULT_SESSION_CAPACITY, window.getIncomingCapacity());
+ }
+
+ @Test
+ public void testIncomingWindowUpdates() {
+ localBegin.setNextOutgoingId(0);
+ remoteBegin.setNextOutgoingId(0);
+
+ window.configureOutbound(localBegin);
+ assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+ window.handleBegin(remoteBegin);
+ assertEquals(Integer.MAX_VALUE, window.getIncomingWindow());
+
+ // Less than one frame worth of capacity
+ window.setIncomingCapacity(1);
+ window.configureOutbound(localBegin);
+ assertEquals(0, localBegin.getIncomingWindow());
+
+ // Exactly one frame worth of capacity
+ window.setIncomingCapacity(DEFAULT_MAX_FRAME_SIZE);
+ window.configureOutbound(localBegin);
+ assertEquals(1, localBegin.getIncomingWindow());
+
+ // Little more than one frame worth of capacity
+ window.setIncomingCapacity(DEFAULT_MAX_FRAME_SIZE + 10);
+ window.configureOutbound(localBegin);
+ assertEquals(1, localBegin.getIncomingWindow());
+
+ // Two frame worth of capacity
+ window.setIncomingCapacity(DEFAULT_MAX_FRAME_SIZE * 2);
+ window.configureOutbound(localBegin);
+ assertEquals(2, localBegin.getIncomingWindow());
+ }
+
+ @Test
+ public void testIncomingWindowUpdatesFrameSizeGreaterThanIntMaxVale() {
+ when(session.getConnection().getMaxFrameSize()).thenReturn((long)
Integer.MAX_VALUE + 100);
+
+ window = new ProtonSessionIncomingWindow(session);
+
+ localBegin.setNextOutgoingId(0);
+ remoteBegin.setNextOutgoingId(0);
+
+ window.configureOutbound(localBegin);
+ assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+ window.handleBegin(remoteBegin);
+ assertEquals(Integer.MAX_VALUE, window.getIncomingWindow());
+
+ // Less than one frame worth of capacity
+ window.setIncomingCapacity(Integer.MAX_VALUE);
+ window.configureOutbound(localBegin);
+ assertEquals(0, localBegin.getIncomingWindow());
+
+ // Exactly one frame worth of capacity
+ window.setIncomingCapacity(Integer.MAX_VALUE + 100);
+ window.configureOutbound(localBegin);
+ assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+
+ // Little more than one frame worth of capacity
+ window.setIncomingCapacity(Integer.MAX_VALUE + 200);
+ window.configureOutbound(localBegin);
+ assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+
+ // Two frame worth of capacity
+ window.setIncomingCapacity(Integer.MAX_VALUE * 2);
+ window.configureOutbound(localBegin);
+ assertEquals(Integer.MAX_VALUE, localBegin.getIncomingWindow());
+ }
+
+ @Test
+ public void testTransferHandlingIncomingCapacityUpdates() {
+ localBegin.setNextOutgoingId(0);
+ remoteBegin.setNextOutgoingId(0);
+
+ window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY);
+ window.configureOutbound(localBegin);
+ window.handleBegin(remoteBegin);
+
+ when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+ assertEquals(0, window.getNextIncomingId());
+ assertEquals(DEFAULT_SESSION_CAPACITY, window.getIncomingCapacity());
+ assertEquals(DEFAULT_SESSION_CAPACITY,
window.getRemainingIncomingCapacity());
+
+ final Transfer transfer = new Transfer();
+ final ProtonBuffer payload =
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer(10).setWriteOffset(10);
+
+ window.handleTransfer(receiver, transfer, payload);
+
+ assertEquals(DEFAULT_SESSION_CAPACITY - payload.getReadableBytes(),
window.getRemainingIncomingCapacity());
+
+ window.deliveryRead(delivery, payload.getReadableBytes());
+
+ assertEquals(DEFAULT_SESSION_CAPACITY, window.getIncomingCapacity());
+ }
+
+ @Test
+ public void testNextIncomingIdStartingAtZero() {
+ localBegin.setNextOutgoingId(0);
+ remoteBegin.setNextOutgoingId(0);
+
+ window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY * 3);
+ window.configureOutbound(localBegin);
+ window.handleBegin(remoteBegin);
+
+ when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+ final Transfer transfer = new Transfer();
+ final ProtonBuffer payload =
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer();
+
+ payload.writeInt(255);
+
+ assertEquals(0, window.getNextIncomingId());
+ assertEquals(3, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(1, window.getNextIncomingId());
+ assertEquals(2, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(2, window.getNextIncomingId());
+ assertEquals(1, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(3, window.getNextIncomingId());
+ assertEquals(0, window.getIncomingWindow());
+ }
+
+ @Test
+ public void testNextIncomingIdStartingAtMaxUInt() {
+ localBegin.setNextOutgoingId(0);
+ remoteBegin.setNextOutgoingId(UnsignedInteger.MAX_VALUE.intValue());
+
+ window.setIncomingCapacity(DEFAULT_SESSION_CAPACITY * 20);
+ window.configureOutbound(localBegin);
+ window.handleBegin(remoteBegin);
+
+ when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+ final Transfer transfer = new Transfer();
+ final ProtonBuffer payload =
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer();
+
+ payload.writeLong(32767);
+
+ assertEquals(UnsignedInteger.MAX_VALUE.intValue(),
window.getNextIncomingId());
+ assertEquals(20, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(0, window.getNextIncomingId());
+ assertEquals(19, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(1, window.getNextIncomingId());
+ assertEquals(18, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(2, window.getNextIncomingId());
+ assertEquals(17, window.getIncomingWindow());
+ }
+
+ @Test
+ public void testIncomingWindowSetToUIntMaxValueCapsMaxToIntMaxValue() {
+ localBegin.setNextOutgoingId(0);
+ remoteBegin.setNextOutgoingId(Integer.MAX_VALUE);
+
+ window.setIncomingCapacity(UnsignedInteger.MAX_VALUE.intValue());
+ window.configureOutbound(localBegin);
+ window.handleBegin(remoteBegin);
+
+ when(receiver.remoteTransfer(any(), any())).thenReturn(delivery);
+
+ final Transfer transfer = new Transfer();
+ final ProtonBuffer payload =
ProtonBufferAllocator.defaultAllocator().allocateHeapBuffer();
+
+ payload.writeBoolean(false);
+
+ assertEquals(Integer.MAX_VALUE, window.getNextIncomingId());
+ assertEquals(Integer.MAX_VALUE, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(Integer.MAX_VALUE + 1, window.getNextIncomingId());
+ assertEquals(Integer.MAX_VALUE - 1, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(Integer.MAX_VALUE + 2, window.getNextIncomingId());
+ assertEquals(Integer.MAX_VALUE - 2, window.getIncomingWindow());
+ window.handleTransfer(receiver, transfer, payload);
+ assertEquals(Integer.MAX_VALUE + 3, window.getNextIncomingId());
+ assertEquals(Integer.MAX_VALUE - 3, window.getIncomingWindow());
+ }
+}
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindowTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindowTest.java
new file mode 100644
index 00000000..1619c583
--- /dev/null
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindowTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.impl;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.mockito.Mockito.when;
+
+import org.apache.qpid.protonj2.types.transport.Begin;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test the proton outgoing window class for specification compliant behavior
+ */
+@ExtendWith(MockitoExtension.class)
+class ProtonSessionOutgoingWindowTest {
+
+ private static final int DEFAULT_MAX_FRAME_SIZE = 100_000;
+
+ @Mock
+ ProtonEngineConfiguration configuration;
+
+ @Mock
+ ProtonEngine engine;
+
+ @Mock
+ ProtonConnection connection;
+
+ @Mock
+ ProtonSession session;
+
+ @Mock
+ ProtonOutgoingDelivery delivery;
+
+ ProtonSessionOutgoingWindow window;
+
+ final Begin remoteBegin = new Begin();
+ final Begin localBegin = new Begin();
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ when(configuration.getOutboundMaxFrameSize()).thenReturn((long)
DEFAULT_MAX_FRAME_SIZE);
+ when(engine.configuration()).thenReturn(configuration);
+ when(session.getEngine()).thenReturn(engine);
+ when(session.getConnection()).thenReturn(connection);
+
+ window = new ProtonSessionOutgoingWindow(session);
+ }
+
+ @Test
+ public void testConfigureOutbound() {
+ window.configureOutbound(localBegin);
+ window.handleBegin(remoteBegin);
+
+ assertEquals(Integer.MAX_VALUE, localBegin.getOutgoingWindow());
+ assertEquals(0, localBegin.getNextOutgoingId());
+
+ assertFalse(window.isSendable());
+ assertEquals(0, window.getNextOutgoingId());
+ assertEquals(-1, window.getOutgoingCapacity()); // No limit set
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]